Refactored FreeSWITCH SessionManager to make use of multiple connections, give up sharing of configuration at package level, make better use of interfaces to communicate with Sessions

This commit is contained in:
DanB
2015-03-06 17:30:12 +01:00
parent 9301918159
commit cb2ab3224b
8 changed files with 147 additions and 193 deletions

View File

@@ -188,8 +188,7 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
}
switch cfg.SMSwitchType {
case FS:
dp, _ := time.ParseDuration(fmt.Sprintf("%vs", cfg.SMDebitInterval))
sm = sessionmanager.NewFSSessionManager(cfg, loggerDb, raterConn, cdrsConn, dp)
sm = sessionmanager.NewFSSessionManager(cfg.SmFsConfig, loggerDb, raterConn, cdrsConn)
case KAMAILIO:
var debitInterval time.Duration
if debitInterval, err = utils.ParseDurationWithSecs(strconv.Itoa(cfg.SMDebitInterval)); err != nil {

View File

@@ -417,7 +417,7 @@ Task-ID: 2
Task-Desc: heartbeat
Task-Group: core
Task-Runtime: 1349437318`
cfg, _ = config.NewDefaultCGRConfig()
cfg, _ := config.NewDefaultCGRConfig()
config.SetCgrConfig(cfg)
ev := new(FSEvent).AsEvent(body)
setupTime, _ := ev.GetSetupTime("Event-Date-Local")
@@ -471,7 +471,7 @@ Task-Runtime: 1349437318`
}
func TestParseFsHangup(t *testing.T) {
cfg, _ = config.NewDefaultCGRConfig()
cfg, _ := config.NewDefaultCGRConfig()
config.SetCgrConfig(cfg)
ev := new(FSEvent).AsEvent(hangupEv)
setupTime, _ := ev.GetSetupTime(utils.META_DEFAULT)
@@ -502,7 +502,7 @@ func TestParseFsHangup(t *testing.T) {
}
func TestParseEventValue(t *testing.T) {
cfg, _ = config.NewDefaultCGRConfig()
cfg, _ := config.NewDefaultCGRConfig()
config.SetCgrConfig(cfg)
ev := new(FSEvent).AsEvent(hangupEv)
if cgrid := ev.ParseEventValue(&utils.RSRField{Id: utils.CGRID}); cgrid != "873e5bf7903978f305f7d8fed3f92f968cf82873" {
@@ -613,7 +613,7 @@ Caller-Username: 04021292812`
}
func TestFsEvAsStoredCdr(t *testing.T) {
cfg, _ = config.NewDefaultCGRConfig()
cfg, _ := config.NewDefaultCGRConfig()
config.SetCgrConfig(cfg)
ev := new(FSEvent).AsEvent(hangupEv)
setupTime, _ := utils.ParseTimeDetectLayout("1398442107")
@@ -629,7 +629,7 @@ func TestFsEvAsStoredCdr(t *testing.T) {
}
func TestFsEvGetExtraFields(t *testing.T) {
cfg, _ = config.NewDefaultCGRConfig()
cfg, _ := config.NewDefaultCGRConfig()
cfg.FSCdrExtraFields = []*utils.RSRField{&utils.RSRField{Id: "Channel-Read-Codec-Name"}, &utils.RSRField{Id: "Channel-Write-Codec-Name"}, &utils.RSRField{Id: "NonExistingHeader"}}
config.SetCgrConfig(cfg)
ev := new(FSEvent).AsEvent(hangupEv)

View File

@@ -19,11 +19,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package sessionmanager
import (
"bufio"
"errors"
"fmt"
"log/syslog"
"net"
"time"
"github.com/cgrates/cgrates/config"
@@ -32,62 +30,63 @@ import (
"github.com/cgrates/fsock"
)
var cfg *config.CGRConfig // Share the configuration with the rest of the package
// The freeswitch session manager type holding a buffer for the network connection
// and the active sessions
type FSSessionManager struct {
conn net.Conn
buf *bufio.Reader
sessions []*Session
rater engine.Connector
cdrs engine.Connector
debitPeriod time.Duration
loggerDB engine.LogStorage
cfg *config.SmFsConfig
conns map[string]*fsock.FSock // Keep the list here for connection management purposes
sessions []*Session
rater engine.Connector
cdrs engine.Connector
loggerDB engine.LogStorage
}
func NewFSSessionManager(cgrCfg *config.CGRConfig, storage engine.LogStorage, rater, cdrs engine.Connector, debitPeriod time.Duration) *FSSessionManager {
cfg = cgrCfg // make config global
return &FSSessionManager{loggerDB: storage, rater: rater, cdrs: cdrs, debitPeriod: debitPeriod}
func NewFSSessionManager(smFsConfig *config.SmFsConfig, storage engine.LogStorage, rater, cdrs engine.Connector) *FSSessionManager {
return &FSSessionManager{cfg: smFsConfig, conns: make(map[string]*fsock.FSock), loggerDB: storage, rater: rater, cdrs: cdrs}
}
// Connects to the freeswitch mod_event_socket server and starts
// listening for events.
func (sm *FSSessionManager) Connect() (err error) {
func (sm *FSSessionManager) Connect() error {
eventFilters := map[string]string{"Call-Direction": "inbound"}
if fsock.FS, err = fsock.NewFSock(cfg.FreeswitchServer, cfg.FreeswitchPass, cfg.FreeswitchReconnects, sm.createHandlers(), eventFilters, engine.Logger.(*syslog.Writer)); err != nil {
return err
} else if !fsock.FS.Connected() {
return errors.New("Cannot connect to FreeSWITCH")
errChan := make(chan error)
for _, connCfg := range sm.cfg.Connections {
connId := utils.GenUUID()
fSock, err := fsock.NewFSock(connCfg.Server, connCfg.Password, connCfg.Reconnects, sm.createHandlers(), eventFilters, engine.Logger.(*syslog.Writer), connId)
if err != nil {
errChan <- err
} else if !fSock.Connected() {
errChan <- errors.New("Could not connect to FreeSWITCH")
} else {
sm.conns[connId] = fSock
}
go func() { // Start reading in own goroutine, return on error
if err := fsock.FS.ReadEvents(); err != nil {
errChan <- err
}
}()
}
if err := fsock.FS.ReadEvents(); err != nil {
return err
}
return errors.New("<SessionManager> - Stopped reading events")
err := <-errChan // Will keep the Connect locked until the first error in one of the connections
return err
}
func (sm *FSSessionManager) createHandlers() (handlers map[string][]func(string)) {
hb := func(body string) {
func (sm *FSSessionManager) createHandlers() (handlers map[string][]func(string, string)) {
cp := func(body, connId string) {
ev := new(FSEvent).AsEvent(body)
sm.OnHeartBeat(ev)
sm.onChannelPark(ev, connId)
}
cp := func(body string) {
ca := func(body, connId string) {
ev := new(FSEvent).AsEvent(body)
sm.OnChannelPark(ev)
sm.onChannelAnswer(ev, connId)
}
ca := func(body string) {
ch := func(body, connId string) {
ev := new(FSEvent).AsEvent(body)
sm.OnChannelAnswer(ev)
sm.onChannelHangupComplete(ev)
}
ch := func(body string) {
ev := new(FSEvent).AsEvent(body)
sm.OnChannelHangupComplete(ev)
}
return map[string][]func(string){
"HEARTBEAT": []func(string){hb},
"CHANNEL_PARK": []func(string){cp},
"CHANNEL_ANSWER": []func(string){ca},
"CHANNEL_HANGUP_COMPLETE": []func(string){ch},
return map[string][]func(string, string){
"CHANNEL_PARK": []func(string, string){cp},
"CHANNEL_ANSWER": []func(string, string){ca},
"CHANNEL_HANGUP_COMPLETE": []func(string, string){ch},
}
}
@@ -102,25 +101,25 @@ func (sm *FSSessionManager) GetSession(uuid string) *Session {
}
// Disconnects a session by sending hangup command to freeswitch
func (sm *FSSessionManager) DisconnectSession(ev utils.Event, notify string) {
if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), notify)); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> Could not send disconect api notification to freeswitch: %s", err.Error()))
func (sm *FSSessionManager) DisconnectSession(ev utils.Event, connId, notify string) {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), notify)); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send disconect api notification to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
}
if notify == INSUFFICIENT_FUNDS {
if len(cfg.FSEmptyBalanceContext) != 0 {
if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_transfer %s %s %s\n\n", ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), cfg.FSEmptyBalanceContext)); err != nil {
engine.Logger.Err("<SessionManager> Could not transfer the call to empty balance context")
if len(sm.cfg.EmptyBalanceContext) != 0 {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s %s\n\n", ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), sm.cfg.EmptyBalanceContext)); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not transfer the call to empty balance context, error: <%s>, connId: %s", err.Error(), connId))
}
return
} else if len(cfg.FSEmptyBalanceAnnFile) != 0 {
if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", ev.GetUUID(), cfg.FSEmptyBalanceAnnFile)); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> Could not send uuid_broadcast to freeswitch: %s", err.Error()))
} else if len(sm.cfg.EmptyBalanceAnnFile) != 0 {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", ev.GetUUID(), sm.cfg.EmptyBalanceAnnFile)); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send uuid_broadcast to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
}
return
}
}
if err := fsock.FS.SendMsgCmd(ev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> Could not send disconect msg to freeswitch: %v", err))
if err := sm.conns[connId].SendMsgCmd(ev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send disconect msg to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
}
return
}
@@ -136,112 +135,54 @@ func (sm *FSSessionManager) RemoveSession(uuid string) {
}
// Sets the call timeout valid of starting of the call
func (sm *FSSessionManager) setMaxCallDuration(uuid string, maxDur time.Duration) error {
func (sm *FSSessionManager) setMaxCallDuration(uuid, connId string, maxDur time.Duration) error {
// _, err := fsock.FS.SendApiCmd(fmt.Sprintf("sched_hangup +%d %s\n\n", int(maxDur.Seconds()), uuid))
_, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s execute_on_answer sched_hangup +%d alloted_timeout\n\n", uuid, int(maxDur.Seconds())))
_, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s execute_on_answer sched_hangup +%d alloted_timeout\n\n", uuid, int(maxDur.Seconds())))
if err != nil {
engine.Logger.Err("could not send sched_hangup command to freeswitch")
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send sched_hangup command to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
return err
}
return nil
}
// Sends the transfer command to unpark the call to freeswitch
func (sm *FSSessionManager) unparkCall(uuid, call_dest_nb, notify string) {
_, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify))
func (sm *FSSessionManager) unparkCall(uuid, connId, call_dest_nb, notify string) {
_, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify))
if err != nil {
engine.Logger.Err("<SessionManager> Could not send unpark api notification to freeswitch")
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send unpark api notification to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
}
if _, err = fsock.FS.SendApiCmd(fmt.Sprintf("uuid_transfer %s %s\n\n", uuid, call_dest_nb)); err != nil {
engine.Logger.Err("<SessionManager> Could not send unpark api call to freeswitch")
if _, err = sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s\n\n", uuid, call_dest_nb)); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send unpark api call to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
}
}
func (sm *FSSessionManager) OnHeartBeat(ev utils.Event) {
engine.Logger.Info("freeswitch ♥")
}
func (sm *FSSessionManager) OnChannelPark(ev utils.Event) {
var maxCallDuration time.Duration // This will be the maximum duration this channel will be allowed to last
var durInitialized bool
attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT),
Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
var dcs utils.DerivedChargers
if err := sm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> OnPark: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) // We unpark on original destination
return
func (sm *FSSessionManager) onChannelPark(ev utils.Event, connId string) {
var maxCallDuration float64 // This will be the maximum duration this channel will be allowed to last
if err := sm.rater.GetDerivedMaxSessionTime(ev, &maxCallDuration); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not get max session time for %s, error: %s", ev.GetUUID(), err.Error()))
}
dcs, _ = dcs.AppendDefaultRun()
for _, dc := range dcs {
runFilters, _ := utils.ParseRSRFields(dc.RunFilters, utils.INFIELD_SEP)
matchingAllFilters := true
for _, dcRunFilter := range runFilters {
if fltrPass, _ := ev.PassesFieldFilter(dcRunFilter); !fltrPass {
matchingAllFilters = false
break
}
}
if !matchingAllFilters { // Do not process the derived charger further if not all filters were matched
continue
}
startTime, err := ev.GetAnswerTime(PARK_TIME)
if err != nil {
engine.Logger.Err("Error parsing answer event start time, using time.Now!")
startTime = time.Now()
}
if ev.MissingParameter() {
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(dc.DestinationField), MISSING_PARAMETER)
engine.Logger.Err(fmt.Sprintf("Missing parameter for %s", ev.GetUUID()))
return
}
cd := engine.CallDescriptor{
Direction: ev.GetDirection(dc.DirectionField),
Tenant: ev.GetTenant(dc.TenantField),
Category: ev.GetCategory(dc.CategoryField),
Subject: ev.GetSubject(dc.SubjectField),
Account: ev.GetAccount(dc.AccountField),
Destination: ev.GetDestination(dc.DestinationField),
TimeStart: startTime,
TimeEnd: startTime.Add(cfg.SMMaxCallDuration),
}
var remainingDurationFloat float64
err = sm.rater.GetMaxSessionTime(cd, &remainingDurationFloat)
if err != nil {
engine.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err))
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(""), SYSTEM_ERROR) // We unpark on original destination
return
}
remainingDuration := time.Duration(remainingDurationFloat)
// Set maxCallDuration, smallest out of all forked sessions
if !durInitialized { // first time we set it /not initialized yet
maxCallDuration = remainingDuration
durInitialized = true
} else if maxCallDuration > remainingDuration {
maxCallDuration = remainingDuration
}
}
if maxCallDuration <= cfg.SMMinCallDuration {
maxCallDur := time.Duration(maxCallDuration)
if maxCallDur <= sm.cfg.MinCallDuration {
//engine.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey(cd.Subject)))
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), INSUFFICIENT_FUNDS)
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), INSUFFICIENT_FUNDS)
return
}
sm.setMaxCallDuration(ev.GetUUID(), maxCallDuration)
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), AUTH_OK)
sm.setMaxCallDuration(ev.GetUUID(), connId, maxCallDur)
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), AUTH_OK)
}
func (sm *FSSessionManager) OnChannelAnswer(ev utils.Event) {
func (sm *FSSessionManager) onChannelAnswer(ev utils.Event, connId string) {
if ev.MissingParameter() {
sm.DisconnectSession(ev, MISSING_PARAMETER)
sm.DisconnectSession(ev, connId, MISSING_PARAMETER)
}
s := NewSession(ev, sm)
s := NewSession(ev, connId, sm)
if s != nil {
sm.sessions = append(sm.sessions, s)
}
}
func (sm *FSSessionManager) OnChannelHangupComplete(ev utils.Event) {
go sm.processCdr(ev.AsStoredCdr())
func (sm *FSSessionManager) onChannelHangupComplete(ev utils.Event) {
go sm.ProcessCdr(ev.AsStoredCdr())
s := sm.GetSession(ev.GetUUID())
if s == nil { // Not handled by us
return
@@ -252,7 +193,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev utils.Event) {
}
}
func (sm *FSSessionManager) processCdr(storedCdr *utils.StoredCdr) error {
func (sm *FSSessionManager) ProcessCdr(storedCdr *utils.StoredCdr) error {
if sm.cdrs != nil {
var reply string
if err := sm.cdrs.ProcessCdr(storedCdr, &reply); err != nil {
@@ -263,15 +204,10 @@ func (sm *FSSessionManager) processCdr(storedCdr *utils.StoredCdr) error {
return nil
}
func (sm *FSSessionManager) GetDebitPeriod() time.Duration {
return sm.debitPeriod
func (sm *FSSessionManager) DebitInterval() time.Duration {
return sm.cfg.DebitInterval
}
func (sm *FSSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engine.CallCost) error {
return sm.rater.MaxDebit(*cd, cc)
}
func (sm *FSSessionManager) GetDbLogger() engine.LogStorage {
func (sm *FSSessionManager) DbLogger() engine.LogStorage {
return sm.loggerDB
}
@@ -279,17 +215,27 @@ func (sm *FSSessionManager) Rater() engine.Connector {
return sm.rater
}
func (sm *FSSessionManager) Shutdown() (err error) {
if fsock.FS == nil || !fsock.FS.Connected() {
return errors.New("Cannot shutdown sessions, fsock not connected")
// Called when call goes under the minimum duratio threshold, so FreeSWITCH can play an announcement message
func (sm *FSSessionManager) WarnSessionMinDuration(sessionUuid, connId string) {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s %s aleg\n\n", sessionUuid, sm.cfg.LowBalanceAnnFile)); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send uuid_broadcast to freeswitch, error: %s, connection id: %s", err.Error(), connId))
}
engine.Logger.Info("Shutting down all sessions...")
if _, err = fsock.FS.SendApiCmd("hupall MANAGER_REQUEST cgr_reqtype prepaid"); err != nil {
engine.Logger.Err(fmt.Sprintf("Error on calls shutdown: %s", err))
}
func (sm *FSSessionManager) Shutdown() (err error) {
for connId, fSock := range sm.conns {
if !fSock.Connected() {
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Cannot shutdown sessions, fsock not connected for connection id: %s", connId))
continue
}
engine.Logger.Info(fmt.Sprintf("<SM-FreeSWITCH> Shutting down all sessions on connection id: %s", connId))
if _, err = fSock.SendApiCmd("hupall MANAGER_REQUEST cgr_reqtype prepaid"); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Error on calls shutdown: %s, connection id: %s", err.Error(), connId))
}
}
for guard := 0; len(sm.sessions) > 0 && guard < 20; guard++ {
time.Sleep(100 * time.Millisecond) // wait for the hungup event to be fired
engine.Logger.Info(fmt.Sprintf("<SessionManager> Shutdown waiting on sessions: %v", sm.sessions))
engine.Logger.Info(fmt.Sprintf("<SM-FreeSWITC> Shutdown waiting on sessions: %v", sm.sessions))
}
return
return nil
}

View File

@@ -75,10 +75,10 @@ func (self *KamailioSessionManager) onCallStart(evData []byte) {
engine.Logger.Err(fmt.Sprintf("<SM-Kamailio> ERROR unmarshalling event: %s, error: %s", evData, err.Error()))
}
if kamEv.MissingParameter() {
self.DisconnectSession(kamEv, utils.ERR_MANDATORY_IE_MISSING)
self.DisconnectSession(kamEv, "", utils.ERR_MANDATORY_IE_MISSING)
return
}
s := NewSession(kamEv, self)
s := NewSession(kamEv, "", self)
if s != nil {
self.sessions = append(self.sessions, s)
}
@@ -119,7 +119,7 @@ func (self *KamailioSessionManager) Connect() error {
return errors.New("<SM-Kamailio> Stopped reading events")
}
func (self *KamailioSessionManager) DisconnectSession(ev utils.Event, notify string) {
func (self *KamailioSessionManager) DisconnectSession(ev utils.Event, connId, notify string) {
sessionIds := ev.GetSessionIds()
disconnectEv := &KamSessionDisconnect{Event: CGR_SESSION_DISCONNECT, HashEntry: sessionIds[0], HashId: sessionIds[1], Reason: notify}
if err := self.kea.Send(disconnectEv.String()); err != nil {
@@ -149,24 +149,28 @@ func (self *KamailioSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engi
return self.rater.MaxDebit(*cd, cc)
}
func (self *KamailioSessionManager) GetDebitPeriod() time.Duration {
func (self *KamailioSessionManager) DebitInterval() time.Duration {
return self.debitInterval
}
func (self *KamailioSessionManager) GetDbLogger() engine.LogStorage {
func (self *KamailioSessionManager) DbLogger() engine.LogStorage {
return self.loggerDb
}
func (self *KamailioSessionManager) Rater() engine.Connector {
return self.rater
}
func (self *KamailioSessionManager) ProcessCdr(cdr *utils.StoredCdr) {
func (self *KamailioSessionManager) ProcessCdr(cdr *utils.StoredCdr) error {
if self.cdrsrv == nil {
return
return nil
}
var reply string
if err := self.cdrsrv.ProcessCdr(cdr, &reply); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", cdr.CgrId, cdr.AccId, err.Error()))
}
return nil
}
func (sm *KamailioSessionManager) WarnSessionMinDuration(sessionUuid, connId string) {
}
func (self *KamailioSessionManager) Shutdown() error {
return nil

View File

@@ -68,7 +68,7 @@ func TestOsipsEventParseStatic(t *testing.T) {
}
func TestOsipsEventGetValues(t *testing.T) {
cfg, _ = config.NewDefaultCGRConfig()
cfg, _ := config.NewDefaultCGRConfig()
config.SetCgrConfig(cfg)
setupTime, _ := osipsEv.GetSetupTime(utils.META_DEFAULT)
eSetupTime, _ := utils.ParseTimeDetectLayout("1406370492")

View File

@@ -35,7 +35,7 @@ func NewOSipsSessionManager(cfg *config.CGRConfig, rater, cdrsrv engine.Connecto
osm := &OsipsSessionManager{cgrCfg: cfg, rater: rater, cdrsrv: cdrsrv}
osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){
"E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.OnOpensipsStart},
"E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.OnCdr},
"E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.onCdr},
"E_CGR_AUTHORIZE": []func(*osipsdagram.OsipsEvent){osm.OnAuthorize},
}
return osm, nil
@@ -70,7 +70,7 @@ func (osm *OsipsSessionManager) Connect() (err error) {
return errors.New("<SM-OpenSIPS> Stopped reading events")
}
func (osm *OsipsSessionManager) DisconnectSession(ev utils.Event, notify string) {
func (osm *OsipsSessionManager) DisconnectSession(ev utils.Event, cgrId, notify string) {
return
}
func (osm *OsipsSessionManager) RemoveSession(uuid string) {
@@ -79,15 +79,18 @@ func (osm *OsipsSessionManager) RemoveSession(uuid string) {
func (osm *OsipsSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engine.CallCost) error {
return nil
}
func (osm *OsipsSessionManager) GetDebitPeriod() time.Duration {
func (osm *OsipsSessionManager) DebitInterval() time.Duration {
var nilDuration time.Duration
return nilDuration
}
func (osm *OsipsSessionManager) GetDbLogger() engine.LogStorage {
func (osm *OsipsSessionManager) DbLogger() engine.LogStorage {
return nil
}
func (self *OsipsSessionManager) Rater() engine.Connector {
return self.rater
func (osm *OsipsSessionManager) Rater() engine.Connector {
return osm.rater
}
func (osm *OsipsSessionManager) WarnSessionMinDuration(sessionUuid, connId string) {
return
}
func (osm *OsipsSessionManager) Shutdown() error {
return nil
@@ -143,13 +146,17 @@ func (osm *OsipsSessionManager) OnOpensipsStart(cdrDagram *osipsdagram.OsipsEven
go osm.SubscribeEvents(evStop)
}
func (osm *OsipsSessionManager) OnCdr(cdrDagram *osipsdagram.OsipsEvent) {
var reply string
func (osm *OsipsSessionManager) onCdr(cdrDagram *osipsdagram.OsipsEvent) {
osipsEv, _ := NewOsipsEvent(cdrDagram)
storedCdr := osipsEv.AsStoredCdr()
if err := osm.cdrsrv.ProcessCdr(storedCdr, &reply); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", storedCdr.CgrId, storedCdr.AccId, err.Error()))
if err := osm.ProcessCdr(osipsEv.AsStoredCdr()); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", osipsEv.GetCgrId(), osipsEv.GetUUID(), err.Error()))
}
}
func (osm *OsipsSessionManager) ProcessCdr(storedCdr *utils.StoredCdr) error {
var reply string
return osm.cdrsrv.ProcessCdr(storedCdr, &reply)
}
// Process Authorize request from OpenSIPS and communicate back maxdur

View File

@@ -25,7 +25,6 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/fsock"
)
// Session type holding the call information fields, a session delegate for specific
@@ -34,6 +33,8 @@ type Session struct {
eventStart utils.Event // Store the original event who started this session so we can use it's info later (eg: disconnect, cgrid)
stopDebit chan bool // Channel to communicate with debit loops when closing the session
sessionManager SessionManager
connId string // Reference towards connection id on the session manager side.
warnMinDur time.Duration
sessionRuns []*engine.SessionRun
}
@@ -51,10 +52,11 @@ func (s *Session) SessionRuns() []*engine.SessionRun {
}
// Creates a new session and in case of prepaid starts the debit loop for each of the session runs individually
func NewSession(ev utils.Event, sm SessionManager) *Session {
func NewSession(ev utils.Event, connId string, sm SessionManager) *Session {
s := &Session{eventStart: ev,
stopDebit: make(chan bool),
sessionManager: sm,
connId: connId,
}
if err := sm.Rater().GetSessionRuns(ev, &s.sessionRuns); err != nil || len(s.sessionRuns) == 0 {
return nil
@@ -69,7 +71,7 @@ func NewSession(ev utils.Event, sm SessionManager) *Session {
func (s *Session) debitLoop(runIdx int) {
nextCd := *s.sessionRuns[runIdx].CallDescriptor
index := 0.0
debitPeriod := s.sessionManager.GetDebitPeriod()
debitPeriod := s.sessionManager.DebitInterval()
for {
select {
case <-s.stopDebit:
@@ -83,19 +85,17 @@ func (s *Session) debitLoop(runIdx int) {
nextCd.LoopIndex = index
nextCd.DurationIndex += debitPeriod // first presumed duration
cc := new(engine.CallCost)
if err := s.sessionManager.MaxDebit(&nextCd, cc); err != nil {
if err := s.sessionManager.Rater().MaxDebit(nextCd, cc); err != nil {
engine.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err))
s.sessionManager.DisconnectSession(s.eventStart, SYSTEM_ERROR)
s.sessionManager.DisconnectSession(s.eventStart, s.connId, SYSTEM_ERROR)
return
}
if cc.GetDuration() == 0 {
s.sessionManager.DisconnectSession(s.eventStart, INSUFFICIENT_FUNDS)
s.sessionManager.DisconnectSession(s.eventStart, s.connId, INSUFFICIENT_FUNDS)
return
}
if cc.GetDuration() <= cfg.FSMinDurLowBalance && len(cfg.FSLowBalanceAnnFile) != 0 {
if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_broadcast %s %s aleg\n\n", s.eventStart.GetUUID(), cfg.FSLowBalanceAnnFile)); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> Could not send uuid_broadcast to freeswitch: %s", err.Error()))
}
if s.warnMinDur != time.Duration(0) && cc.GetDuration() <= s.warnMinDur {
s.sessionManager.WarnSessionMinDuration(s.eventStart.GetUUID(), s.connId)
}
s.sessionRuns[runIdx].CallCosts = append(s.sessionRuns[runIdx].CallCosts, cc)
nextCd.TimeEnd = cc.GetEndTime() // set debited timeEnd
@@ -204,9 +204,6 @@ func (s *Session) SaveOperations() {
for _, cc := range sr.CallCosts[1:] {
firstCC.Merge(cc)
}
if s.sessionManager.GetDbLogger() == nil {
engine.Logger.Err("<SessionManager> Error: no connection to logger database, cannot save costs")
}
s.sessionManager.GetDbLogger().LogCallCost(s.eventStart.GetCgrId(), engine.SESSION_MANAGER_SOURCE, sr.DerivedCharger.RunId, firstCC)
s.sessionManager.DbLogger().LogCallCost(s.eventStart.GetCgrId(), engine.SESSION_MANAGER_SOURCE, sr.DerivedCharger.RunId, firstCC)
}
}

View File

@@ -26,12 +26,13 @@ import (
)
type SessionManager interface {
Connect() error
DisconnectSession(utils.Event, string)
RemoveSession(string)
MaxDebit(*engine.CallDescriptor, *engine.CallCost) error
GetDebitPeriod() time.Duration
GetDbLogger() engine.LogStorage
DbLogger() engine.LogStorage
Rater() engine.Connector
DebitInterval() time.Duration
Connect() error
DisconnectSession(utils.Event, string, string)
WarnSessionMinDuration(string, string)
RemoveSession(string)
ProcessCdr(*utils.StoredCdr) error
Shutdown() error
}