mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-16 21:59:53 +05:00
Removed sm_opensips out of engine
This commit is contained in:
committed by
Dan Christian Bogos
parent
2d675dbe05
commit
e29129c3ca
@@ -343,35 +343,6 @@ func startSmKamailio(internalRaterChan, internalCDRSChan, internalRsChan chan rp
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) {
|
||||
var err error
|
||||
utils.Logger.Info("Starting CGRateS SMOpenSIPS service.")
|
||||
var ralsConn, cdrsConn *rpcclient.RpcClientPool
|
||||
if len(cfg.SmOsipsConfig.RALsConns) != 0 {
|
||||
ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
|
||||
cfg.SmOsipsConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<SMOpenSIPS> Could not connect to RALs: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
if len(cfg.SmOsipsConfig.CDRsConns) != 0 {
|
||||
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
|
||||
cfg.SmOsipsConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<SMOpenSIPS> Could not connect to CDRs: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, cfg.Reconnects, ralsConn, cdrsConn, cfg.DefaultTimezone)
|
||||
if err := sm.Connect(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> error: %s!", err))
|
||||
}
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
|
||||
cdrDb engine.CdrStorage, dm *engine.DataManager,
|
||||
internalRaterChan, internalPubSubSChan, internalAttributeSChan, internalUserSChan, internalAliaseSChan,
|
||||
@@ -939,11 +910,6 @@ func main() {
|
||||
go startSmKamailio(internalRaterChan, internalCdrSChan, internalRsChan, cdrDb, exitChan)
|
||||
}
|
||||
|
||||
// Start SM-OpenSIPS
|
||||
if cfg.SmOsipsConfig.Enabled {
|
||||
go startSmOpenSIPS(internalRaterChan, internalCdrSChan, cdrDb, exitChan)
|
||||
}
|
||||
|
||||
if cfg.AsteriskAgentCfg().Enabled {
|
||||
go startAsteriskAgent(internalSMGChan, exitChan)
|
||||
}
|
||||
|
||||
@@ -720,11 +720,6 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
jsnSmOsipsCfg, err := jsnCfg.SmOsipsJsonCfg()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
jsnSMAstCfg, err := jsnCfg.AsteriskAgentJsonCfg()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -1198,12 +1193,6 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
if jsnSmOsipsCfg != nil {
|
||||
if err := self.SmOsipsConfig.loadFromJsonCfg(jsnSmOsipsCfg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if jsnSMAstCfg != nil {
|
||||
if err := self.asteriskAgentCfg.loadFromJsonCfg(jsnSMAstCfg); err != nil {
|
||||
return err
|
||||
|
||||
@@ -352,25 +352,6 @@ const CGRATES_CFG_JSON = `
|
||||
],
|
||||
},
|
||||
|
||||
|
||||
"sm_opensips": {
|
||||
"enabled": false, // starts SessionManager service: <true|false>
|
||||
"listen_udp": "127.0.0.1:2020", // address where to listen for datagram events coming from OpenSIPS
|
||||
"rals_conns": [
|
||||
{"address": "*internal"} // address where to reach the Rater <""|*internal|127.0.0.1:2013>
|
||||
],
|
||||
"cdrs_conns": [
|
||||
{"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <*internal|x.y.z.y:1234>
|
||||
],
|
||||
"create_cdr": false, // create CDR out of events and sends it to CDRS component
|
||||
"debit_interval": "10s", // interval to perform debits on.
|
||||
"min_call_duration": "0s", // only authorize calls with allowed duration higher than this
|
||||
"max_call_duration": "3h", // maximum call duration a prepaid call can last
|
||||
"events_subscribe_interval": "60s", // automatic events subscription to OpenSIPS, 0 to disable it
|
||||
"mi_addr": "127.0.0.1:8020", // address where to reach OpenSIPS MI to send session disconnects
|
||||
},
|
||||
|
||||
|
||||
"diameter_agent": {
|
||||
"enabled": false, // enables the diameter agent: <true|false>
|
||||
"listen": "127.0.0.1:3868", // address where to listen for diameter requests <x.y.z.y:1234>
|
||||
|
||||
@@ -45,7 +45,6 @@ const (
|
||||
SessionSJson = "sessions"
|
||||
FreeSWITCHAgentJSN = "freeswitch_agent"
|
||||
SMKAM_JSN = "sm_kamailio"
|
||||
SMOSIPS_JSN = "sm_opensips"
|
||||
AsteriskAgentJSN = "asterisk_agent"
|
||||
SM_JSN = "session_manager"
|
||||
FS_JSN = "freeswitch"
|
||||
@@ -270,18 +269,6 @@ func (self CgrJsonCfg) SmKamJsonCfg() (*SmKamJsonCfg, error) {
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (self CgrJsonCfg) SmOsipsJsonCfg() (*SmOsipsJsonCfg, error) {
|
||||
rawCfg, hasKey := self[SMOSIPS_JSN]
|
||||
if !hasKey {
|
||||
return nil, nil
|
||||
}
|
||||
cfg := new(SmOsipsJsonCfg)
|
||||
if err := json.Unmarshal(*rawCfg, cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (self CgrJsonCfg) AsteriskAgentJsonCfg() (*AsteriskAgentJsonCfg, error) {
|
||||
rawCfg, hasKey := self[AsteriskAgentJSN]
|
||||
if !hasKey {
|
||||
|
||||
@@ -572,32 +572,6 @@ func TestSmKamJsonCfg(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSmOsipsJsonCfg(t *testing.T) {
|
||||
eCfg := &SmOsipsJsonCfg{
|
||||
Enabled: utils.BoolPointer(false),
|
||||
Listen_udp: utils.StringPointer("127.0.0.1:2020"),
|
||||
Rals_conns: &[]*HaPoolJsonCfg{
|
||||
&HaPoolJsonCfg{
|
||||
Address: utils.StringPointer(utils.MetaInternal),
|
||||
}},
|
||||
Cdrs_conns: &[]*HaPoolJsonCfg{
|
||||
&HaPoolJsonCfg{
|
||||
Address: utils.StringPointer(utils.MetaInternal),
|
||||
}},
|
||||
Create_cdr: utils.BoolPointer(false),
|
||||
Debit_interval: utils.StringPointer("10s"),
|
||||
Min_call_duration: utils.StringPointer("0s"),
|
||||
Max_call_duration: utils.StringPointer("3h"),
|
||||
Events_subscribe_interval: utils.StringPointer("60s"),
|
||||
Mi_addr: utils.StringPointer("127.0.0.1:8020"),
|
||||
}
|
||||
if cfg, err := dfCgrJsonCfg.SmOsipsJsonCfg(); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eCfg, cfg) {
|
||||
t.Error("Received: ", cfg)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAsteriskAgentJsonCfg(t *testing.T) {
|
||||
eCfg := &AsteriskAgentJsonCfg{
|
||||
Enabled: utils.BoolPointer(false),
|
||||
|
||||
@@ -588,25 +588,6 @@ func TestCgrCfgJSONDefaultsSMKamConfig(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCgrCfgJSONDefaultsSMOsipsConfig(t *testing.T) {
|
||||
eSmOpCfg := &SmOsipsConfig{
|
||||
Enabled: false,
|
||||
ListenUdp: "127.0.0.1:2020",
|
||||
RALsConns: []*HaPoolConfig{&HaPoolConfig{Address: "*internal"}},
|
||||
CDRsConns: []*HaPoolConfig{&HaPoolConfig{Address: "*internal"}},
|
||||
CreateCdr: false,
|
||||
DebitInterval: 10 * time.Second,
|
||||
MinCallDuration: 0 * time.Second,
|
||||
MaxCallDuration: 3 * time.Hour,
|
||||
EventsSubscribeInterval: 60 * time.Second,
|
||||
MiAddr: "127.0.0.1:8020",
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(cgrCfg.SmOsipsConfig, eSmOpCfg) {
|
||||
t.Errorf("received: %+v, expecting: %+v", cgrCfg.SmOsipsConfig, eSmOpCfg)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCgrCfgJSONDefaultssteriskAgentCfg(t *testing.T) {
|
||||
eAstAgentCfg := &AsteriskAgentCfg{
|
||||
Enabled: false,
|
||||
|
||||
@@ -300,24 +300,6 @@
|
||||
// },
|
||||
|
||||
|
||||
// "sm_opensips": {
|
||||
// "enabled": false, // starts SessionManager service: <true|false>
|
||||
// "listen_udp": "127.0.0.1:2020", // address where to listen for datagram events coming from OpenSIPS
|
||||
// "rals_conns": [
|
||||
// {"address": "*internal"} // address where to reach the Rater <""|*internal|127.0.0.1:2013>
|
||||
// ],
|
||||
// "cdrs_conns": [
|
||||
// {"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <*internal|x.y.z.y:1234>
|
||||
// ],
|
||||
// "create_cdr": false, // create CDR out of events and sends it to CDRS component
|
||||
// "debit_interval": "10s", // interval to perform debits on.
|
||||
// "min_call_duration": "0s", // only authorize calls with allowed duration higher than this
|
||||
// "max_call_duration": "3h", // maximum call duration a prepaid call can last
|
||||
// "events_subscribe_interval": "60s", // automatic events subscription to OpenSIPS, 0 to disable it
|
||||
// "mi_addr": "127.0.0.1:8020", // address where to reach OpenSIPS MI to send session disconnects
|
||||
// },
|
||||
|
||||
|
||||
// "diameter_agent": {
|
||||
// "enabled": false, // enables the diameter agent: <true|false>
|
||||
// "listen": "127.0.0.1:3868", // address where to listen for diameter requests <x.y.z.y:1234>
|
||||
|
||||
@@ -35,12 +35,4 @@
|
||||
"enabled": true, // starts the cdrstats service: <true|false>
|
||||
},
|
||||
|
||||
"sm_opensips": {
|
||||
"enabled": true, // starts SessionManager service: <true|false>
|
||||
"listen_udp": ":2020", // address where to listen for datagram events coming from OpenSIPS
|
||||
"create_cdr": true, // create CDR out of events and sends them to CDRS component
|
||||
"debit_interval": "5s", // interval to perform debits on.
|
||||
"mi_addr": "192.168.56.128:8020", // address where to reach OpenSIPS MI to send session disconnects
|
||||
},
|
||||
|
||||
}
|
||||
|
||||
@@ -1,351 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package sessionmanager
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/osipsdagram"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
/*
|
||||
E_ACC_EVENT
|
||||
method::INVITE
|
||||
from_tag::87d02470
|
||||
to_tag::a671a98
|
||||
callid::05dac0aaa716c9814f855f0e8fee6936@0:0:0:0:0:0:0:0
|
||||
sip_code::200
|
||||
sip_reason::OK
|
||||
time::1430579770
|
||||
cgr_reqtype::*pseudoprepaid
|
||||
cgr_account::1002
|
||||
cgr_subject::1002
|
||||
cgr_destination::1002
|
||||
originalUri::sip:1002@172.16.254.77
|
||||
duration::
|
||||
|
||||
#
|
||||
E_ACC_EVENT
|
||||
method::BYE
|
||||
from_tag::a671a98
|
||||
to_tag::87d02470
|
||||
callid::05dac0aaa716c9814f855f0e8fee6936@0:0:0:0:0:0:0:0
|
||||
sip_code::200
|
||||
sip_reason::OK
|
||||
time::1430579797
|
||||
cgr_reqtype::
|
||||
cgr_account::
|
||||
cgr_subject::
|
||||
cgr_destination::
|
||||
originalUri::sip:1002@172.16.254.1:5060;transport=udp;registering_acc=172_16_254_77
|
||||
duration::
|
||||
|
||||
E_ACC_MISSED_EVENT
|
||||
method::INVITE
|
||||
from_tag::1d5efcc1
|
||||
to_tag::
|
||||
callid::c0965d3f42c720397ca1a5be9619c2ef@0:0:0:0:0:0:0:0
|
||||
sip_code::404
|
||||
sip_reason::Not Found
|
||||
time::1430579759
|
||||
cgr_reqtype::*pseudoprepaid
|
||||
cgr_account::1002
|
||||
cgr_subject::1002
|
||||
cgr_destination::1002
|
||||
originalUri::sip:1002@172.16.254.77
|
||||
duration::
|
||||
|
||||
*/
|
||||
|
||||
func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, reconnects int, rater, cdrsrv rpcclient.RpcClientConnection, timezone string) (*OsipsSessionManager, error) {
|
||||
osm := &OsipsSessionManager{cfg: smOsipsCfg, reconnects: reconnects, rater: rater, cdrsrv: cdrsrv, timezone: timezone, cdrStartEvents: make(map[string]*OsipsEvent), sessions: NewSessions()}
|
||||
osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){
|
||||
"E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.onOpensipsStart}, // Raised when OpenSIPS starts so we can register our event handlers
|
||||
"E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.onCdr}, // Raised if cdr_flag is configured
|
||||
"E_ACC_MISSED_EVENT": []func(*osipsdagram.OsipsEvent){osm.onCdr}, // Raised if evi_missed_flag is configured
|
||||
"E_ACC_EVENT": []func(*osipsdagram.OsipsEvent){osm.onAccEvent}, // Raised if evi_flag is configured and not cdr_flag containing start/stop events
|
||||
}
|
||||
return osm, nil
|
||||
}
|
||||
|
||||
type OsipsSessionManager struct {
|
||||
cfg *config.SmOsipsConfig
|
||||
reconnects int
|
||||
rater rpcclient.RpcClientConnection
|
||||
cdrsrv rpcclient.RpcClientConnection
|
||||
timezone string
|
||||
eventHandlers map[string][]func(*osipsdagram.OsipsEvent)
|
||||
evSubscribeStop chan struct{} // Reference towards the channel controlling subscriptions, keep it as reference so we do not need to copy it
|
||||
stopServing chan struct{} // Stop serving datagrams
|
||||
miConn *osipsdagram.OsipsMiDatagramConnector // Pool of connections used to various OpenSIPS servers, keep reference towards events received so we can issue commands always to the same remote
|
||||
sessions *Sessions
|
||||
cdrStartEvents map[string]*OsipsEvent // Used when building CDRs, ToDo: secure access to map
|
||||
cdrSEMux sync.RWMutex
|
||||
}
|
||||
|
||||
// Called when firing up the session manager, will stay connected for the duration of the daemon running
|
||||
func (osm *OsipsSessionManager) Connect() (err error) {
|
||||
osm.stopServing = make(chan struct{})
|
||||
if osm.miConn, err = osipsdagram.NewOsipsMiDatagramConnector(osm.cfg.MiAddr, osm.reconnects); err != nil {
|
||||
return fmt.Errorf("Cannot connect to OpenSIPS at %s, error: %s", osm.cfg.MiAddr, err.Error())
|
||||
}
|
||||
osm.evSubscribeStop = make(chan struct{})
|
||||
defer func() { osm.evSubscribeStop <- struct{}{} }() // Stop subscribing on disconnect
|
||||
go osm.SubscribeEvents(osm.evSubscribeStop)
|
||||
evsrv, err := osipsdagram.NewEventServer(osm.cfg.ListenUdp, osm.eventHandlers)
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Cannot initialize datagram server, error: <%s>", err.Error()))
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<SM-OpenSIPS> Listening for datagram events at <%s>", osm.cfg.ListenUdp))
|
||||
evsrv.ServeEvents(osm.stopServing) // Will break through stopServing on error in other places
|
||||
return errors.New("<SM-OpenSIPS> Stopped reading events")
|
||||
}
|
||||
|
||||
// DebitInterval will give out the frequence of the debits sent to engine
|
||||
func (osm *OsipsSessionManager) DebitInterval() time.Duration {
|
||||
return osm.cfg.DebitInterval
|
||||
}
|
||||
|
||||
// Returns the connection to local cdr database, used by session to log it's final costs
|
||||
func (osm *OsipsSessionManager) CdrSrv() rpcclient.RpcClientConnection {
|
||||
return osm.cdrsrv
|
||||
}
|
||||
|
||||
// Returns connection to rater/controller
|
||||
func (osm *OsipsSessionManager) Rater() rpcclient.RpcClientConnection {
|
||||
return osm.rater
|
||||
}
|
||||
|
||||
// Part of the session manager interface, not really used with OpenSIPS now
|
||||
func (osm *OsipsSessionManager) WarnSessionMinDuration(sessionUuid, connId string) {
|
||||
return
|
||||
}
|
||||
|
||||
// Called on session manager shutdown, could add more cleanup actions in the future
|
||||
func (osm *OsipsSessionManager) Shutdown() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Process the CDR with CDRS component
|
||||
func (osm *OsipsSessionManager) ProcessCdr(storedCdr *engine.CDR) error {
|
||||
var reply string
|
||||
return osm.cdrsrv.Call("CdrsV1.ProcessCDR", storedCdr, &reply)
|
||||
}
|
||||
|
||||
// Disconnects the session
|
||||
func (osm *OsipsSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error {
|
||||
sessionIds := ev.GetSessionIds()
|
||||
if len(sessionIds) != 2 {
|
||||
errMsg := fmt.Sprintf("Failed disconnecting session for event: %+v, notify: %s, dialogId: %v", ev, notify, sessionIds)
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> " + errMsg))
|
||||
return errors.New(errMsg)
|
||||
}
|
||||
cmd := fmt.Sprintf(":dlg_end_dlg:\n%s\n%s\n\n", sessionIds[0], sessionIds[1])
|
||||
if reply, err := osm.miConn.SendCommand([]byte(cmd)); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed disconnecting session for event: %+v, notify: %s, dialogId: %v, error: <%s>", ev, notify, sessionIds, err))
|
||||
return err
|
||||
} else if !bytes.HasPrefix(reply, []byte("200 OK")) {
|
||||
errStr := fmt.Sprintf("Failed disconnecting session for event: %+v, notify: %s, dialogId: %v", ev, notify, sessionIds)
|
||||
utils.Logger.Err("<SM-OpenSIPS> " + errStr)
|
||||
return errors.New(errStr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Automatic subscribe to OpenSIPS for events, trigered on Connect or OpenSIPS restart
|
||||
func (osm *OsipsSessionManager) SubscribeEvents(evStop chan struct{}) error {
|
||||
if err := osm.subscribeEvents(); err != nil { // Init subscribe
|
||||
close(osm.stopServing) // Do not serve anymore since we got errors on subscribing
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-evStop: // Break this loop from outside
|
||||
return nil
|
||||
case <-time.After(osm.cfg.EventsSubscribeInterval): // Subscribe on interval
|
||||
if err := osm.subscribeEvents(); err != nil {
|
||||
close(osm.stopServing) // Order stop serving, do not return here since we will block the channel consuming
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// One subscribe attempt to OpenSIPS
|
||||
func (osm *OsipsSessionManager) subscribeEvents() error {
|
||||
subscribeInterval := osm.cfg.EventsSubscribeInterval + time.Duration(1)*time.Second // Avoid concurrency on expiry
|
||||
listenAddrSplt := strings.Split(osm.cfg.ListenUdp, ":")
|
||||
portListen := listenAddrSplt[1]
|
||||
addrListen := listenAddrSplt[0]
|
||||
if len(addrListen) == 0 { //Listen on all addresses, try finding out from mi connection
|
||||
if localAddr := osm.miConn.LocallAddr(); localAddr != nil {
|
||||
addrListen = strings.Split(localAddr.String(), ":")[0]
|
||||
}
|
||||
}
|
||||
for eventName := range osm.eventHandlers {
|
||||
if eventName == "E_OPENSIPS_START" { // Do not subscribe for start since this should be hardcoded
|
||||
continue
|
||||
}
|
||||
cmd := fmt.Sprintf(":event_subscribe:\n%s\nudp:%s:%s\n%d\n", eventName, addrListen, portListen, int(subscribeInterval.Seconds()))
|
||||
if reply, err := osm.miConn.SendCommand([]byte(cmd)); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed subscribing to OpenSIPS at address: <%s>, error: <%s>", osm.cfg.MiAddr, err))
|
||||
return err
|
||||
} else if !bytes.HasPrefix(reply, []byte("200 OK")) {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed subscribing to OpenSIPS at address: <%s>", osm.cfg.MiAddr))
|
||||
return errors.New("Failed subscribing to OpenSIPS events")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Triggered opensips_start event
|
||||
func (osm *OsipsSessionManager) onOpensipsStart(cdrDagram *osipsdagram.OsipsEvent) {
|
||||
osm.evSubscribeStop <- struct{}{} // Cancel previous subscribes
|
||||
osm.evSubscribeStop = make(chan struct{}) // Create a fresh communication channel
|
||||
go osm.SubscribeEvents(osm.evSubscribeStop)
|
||||
}
|
||||
|
||||
// Triggered by CDR event
|
||||
func (osm *OsipsSessionManager) onCdr(cdrDagram *osipsdagram.OsipsEvent) {
|
||||
osipsEv, _ := NewOsipsEvent(cdrDagram)
|
||||
if err := osm.ProcessCdr(osipsEv.AsCDR(osm.timezone)); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", osipsEv.GetCgrId(osm.timezone), osipsEv.GetUUID(), err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
// Triggered by ACC_EVENT
|
||||
func (osm *OsipsSessionManager) onAccEvent(osipsDgram *osipsdagram.OsipsEvent) {
|
||||
osipsEv, _ := NewOsipsEvent(osipsDgram)
|
||||
if osipsEv.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request
|
||||
return
|
||||
}
|
||||
if osipsDgram.AttrValues["method"] == "INVITE" { // Call start
|
||||
if err := osm.callStart(osipsEv); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing CALL_START out of %+v, error: <%s>", osipsDgram, err.Error()))
|
||||
}
|
||||
if err := osm.processCdrStart(osipsEv); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing cdr start out of %+v, error: <%s>", osipsDgram, err.Error()))
|
||||
}
|
||||
} else if osipsDgram.AttrValues["method"] == "BYE" {
|
||||
if err := osm.callEnd(osipsEv); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing CALL_END out of %+v, error: <%s>", osipsDgram, err.Error()))
|
||||
}
|
||||
if err := osm.processCdrStop(osipsEv); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing cdr stop out of %+v, error: <%s>", osipsDgram, err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handler of call start event. Mostly starts a session if needed
|
||||
func (osm *OsipsSessionManager) callStart(osipsEv *OsipsEvent) error {
|
||||
if osipsEv.MissingParameter(osm.timezone) {
|
||||
if err := osm.DisconnectSession(osipsEv, "", utils.ErrMandatoryIeMissing.Error()); err != nil {
|
||||
return err
|
||||
}
|
||||
return utils.ErrMandatoryIeMissing
|
||||
}
|
||||
s := NewSession(osipsEv, "", osm)
|
||||
if s != nil {
|
||||
osm.sessions.indexSession(s)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Handler for callEnd. Mostly removes a session if needed
|
||||
func (osm *OsipsSessionManager) callEnd(osipsEv *OsipsEvent) error {
|
||||
s := osm.sessions.getSession(osipsEv.GetUUID())
|
||||
if s == nil { // Not handled by us
|
||||
return nil
|
||||
}
|
||||
origEvent := s.eventStart.(*OsipsEvent) // Need a complete event for methods in close
|
||||
if err := origEvent.updateDurationFromEvent(osipsEv); err != nil {
|
||||
return err
|
||||
}
|
||||
if origEvent.MissingParameter(osm.timezone) {
|
||||
return utils.ErrMandatoryIeMissing
|
||||
}
|
||||
if err := osm.sessions.removeSession(s, origEvent); err != nil { // Unreference it early so we avoid concurrency
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Records the event start in case of received so we can create CDR out of it
|
||||
func (osm *OsipsSessionManager) processCdrStart(osipsEv *OsipsEvent) error {
|
||||
if !osm.cfg.CreateCdr {
|
||||
return nil
|
||||
}
|
||||
if dialogId := osipsEv.DialogId(); dialogId == "" {
|
||||
return errors.New("Missing dialog_id")
|
||||
} else {
|
||||
osm.cdrSEMux.Lock()
|
||||
osm.cdrStartEvents[dialogId] = osipsEv
|
||||
osm.cdrSEMux.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// processCdrStop builds the complete CDR out of eventStart+eventStop and sends it to the CDRS component
|
||||
func (osm *OsipsSessionManager) processCdrStop(osipsEv *OsipsEvent) error {
|
||||
if osm.cdrsrv == nil {
|
||||
return nil
|
||||
}
|
||||
osm.cdrSEMux.Lock()
|
||||
defer osm.cdrSEMux.Unlock()
|
||||
var osipsEvStart *OsipsEvent
|
||||
var hasIt bool
|
||||
dialogId := osipsEv.DialogId()
|
||||
if dialogId == "" {
|
||||
return errors.New("Missing dialog_id")
|
||||
}
|
||||
osm.cdrSEMux.RLock()
|
||||
osipsEvStart, hasIt = osm.cdrStartEvents[dialogId]
|
||||
osm.cdrSEMux.RUnlock()
|
||||
if !hasIt {
|
||||
return errors.New("Missing event start info")
|
||||
}
|
||||
osm.cdrSEMux.Lock()
|
||||
delete(osm.cdrStartEvents, dialogId) // Cleanup the event once we got it
|
||||
osm.cdrSEMux.Unlock()
|
||||
if err := osipsEvStart.updateDurationFromEvent(osipsEv); err != nil {
|
||||
return err
|
||||
}
|
||||
return osm.ProcessCdr(osipsEvStart.AsCDR(osm.timezone))
|
||||
}
|
||||
|
||||
func (osm *OsipsSessionManager) Sessions() []*Session {
|
||||
return osm.sessions.getSessions()
|
||||
}
|
||||
|
||||
// Sync sessions with FS
|
||||
func (osm *OsipsSessionManager) SyncSessions() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (osm *OsipsSessionManager) Timezone() string {
|
||||
return osm.timezone
|
||||
}
|
||||
@@ -1,26 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package sessionmanager
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestOsipsSMInterface(t *testing.T) {
|
||||
var _ SessionManager = SessionManager(new(OsipsSessionManager))
|
||||
}
|
||||
Reference in New Issue
Block a user