diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go
index e6cff590c..0bd27d1ef 100644
--- a/cmd/cgr-engine/cgr-engine.go
+++ b/cmd/cgr-engine/cgr-engine.go
@@ -188,8 +188,7 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
}
switch cfg.SMSwitchType {
case FS:
- dp, _ := time.ParseDuration(fmt.Sprintf("%vs", cfg.SMDebitInterval))
- sm = sessionmanager.NewFSSessionManager(cfg, loggerDb, raterConn, cdrsConn, dp)
+ sm = sessionmanager.NewFSSessionManager(cfg.SmFsConfig, loggerDb, raterConn, cdrsConn)
case KAMAILIO:
var debitInterval time.Duration
if debitInterval, err = utils.ParseDurationWithSecs(strconv.Itoa(cfg.SMDebitInterval)); err != nil {
diff --git a/sessionmanager/fsevent_test.go b/sessionmanager/fsevent_test.go
index c52201b10..1db022952 100644
--- a/sessionmanager/fsevent_test.go
+++ b/sessionmanager/fsevent_test.go
@@ -417,7 +417,7 @@ Task-ID: 2
Task-Desc: heartbeat
Task-Group: core
Task-Runtime: 1349437318`
- cfg, _ = config.NewDefaultCGRConfig()
+ cfg, _ := config.NewDefaultCGRConfig()
config.SetCgrConfig(cfg)
ev := new(FSEvent).AsEvent(body)
setupTime, _ := ev.GetSetupTime("Event-Date-Local")
@@ -471,7 +471,7 @@ Task-Runtime: 1349437318`
}
func TestParseFsHangup(t *testing.T) {
- cfg, _ = config.NewDefaultCGRConfig()
+ cfg, _ := config.NewDefaultCGRConfig()
config.SetCgrConfig(cfg)
ev := new(FSEvent).AsEvent(hangupEv)
setupTime, _ := ev.GetSetupTime(utils.META_DEFAULT)
@@ -502,7 +502,7 @@ func TestParseFsHangup(t *testing.T) {
}
func TestParseEventValue(t *testing.T) {
- cfg, _ = config.NewDefaultCGRConfig()
+ cfg, _ := config.NewDefaultCGRConfig()
config.SetCgrConfig(cfg)
ev := new(FSEvent).AsEvent(hangupEv)
if cgrid := ev.ParseEventValue(&utils.RSRField{Id: utils.CGRID}); cgrid != "873e5bf7903978f305f7d8fed3f92f968cf82873" {
@@ -613,7 +613,7 @@ Caller-Username: 04021292812`
}
func TestFsEvAsStoredCdr(t *testing.T) {
- cfg, _ = config.NewDefaultCGRConfig()
+ cfg, _ := config.NewDefaultCGRConfig()
config.SetCgrConfig(cfg)
ev := new(FSEvent).AsEvent(hangupEv)
setupTime, _ := utils.ParseTimeDetectLayout("1398442107")
@@ -629,7 +629,7 @@ func TestFsEvAsStoredCdr(t *testing.T) {
}
func TestFsEvGetExtraFields(t *testing.T) {
- cfg, _ = config.NewDefaultCGRConfig()
+ cfg, _ := config.NewDefaultCGRConfig()
cfg.FSCdrExtraFields = []*utils.RSRField{&utils.RSRField{Id: "Channel-Read-Codec-Name"}, &utils.RSRField{Id: "Channel-Write-Codec-Name"}, &utils.RSRField{Id: "NonExistingHeader"}}
config.SetCgrConfig(cfg)
ev := new(FSEvent).AsEvent(hangupEv)
diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go
index b135ebc41..7eddcfdac 100644
--- a/sessionmanager/fssessionmanager.go
+++ b/sessionmanager/fssessionmanager.go
@@ -19,11 +19,9 @@ along with this program. If not, see
package sessionmanager
import (
- "bufio"
"errors"
"fmt"
"log/syslog"
- "net"
"time"
"github.com/cgrates/cgrates/config"
@@ -32,62 +30,63 @@ import (
"github.com/cgrates/fsock"
)
-var cfg *config.CGRConfig // Share the configuration with the rest of the package
-
// The freeswitch session manager type holding a buffer for the network connection
// and the active sessions
type FSSessionManager struct {
- conn net.Conn
- buf *bufio.Reader
- sessions []*Session
- rater engine.Connector
- cdrs engine.Connector
- debitPeriod time.Duration
- loggerDB engine.LogStorage
+ cfg *config.SmFsConfig
+ conns map[string]*fsock.FSock // Keep the list here for connection management purposes
+ sessions []*Session
+ rater engine.Connector
+ cdrs engine.Connector
+ loggerDB engine.LogStorage
}
-func NewFSSessionManager(cgrCfg *config.CGRConfig, storage engine.LogStorage, rater, cdrs engine.Connector, debitPeriod time.Duration) *FSSessionManager {
- cfg = cgrCfg // make config global
- return &FSSessionManager{loggerDB: storage, rater: rater, cdrs: cdrs, debitPeriod: debitPeriod}
+func NewFSSessionManager(smFsConfig *config.SmFsConfig, storage engine.LogStorage, rater, cdrs engine.Connector) *FSSessionManager {
+ return &FSSessionManager{cfg: smFsConfig, conns: make(map[string]*fsock.FSock), loggerDB: storage, rater: rater, cdrs: cdrs}
}
// Connects to the freeswitch mod_event_socket server and starts
// listening for events.
-func (sm *FSSessionManager) Connect() (err error) {
+func (sm *FSSessionManager) Connect() error {
eventFilters := map[string]string{"Call-Direction": "inbound"}
- if fsock.FS, err = fsock.NewFSock(cfg.FreeswitchServer, cfg.FreeswitchPass, cfg.FreeswitchReconnects, sm.createHandlers(), eventFilters, engine.Logger.(*syslog.Writer)); err != nil {
- return err
- } else if !fsock.FS.Connected() {
- return errors.New("Cannot connect to FreeSWITCH")
+ errChan := make(chan error)
+ for _, connCfg := range sm.cfg.Connections {
+ connId := utils.GenUUID()
+ fSock, err := fsock.NewFSock(connCfg.Server, connCfg.Password, connCfg.Reconnects, sm.createHandlers(), eventFilters, engine.Logger.(*syslog.Writer), connId)
+ if err != nil {
+ errChan <- err
+ } else if !fSock.Connected() {
+ errChan <- errors.New("Could not connect to FreeSWITCH")
+ } else {
+ sm.conns[connId] = fSock
+ }
+ go func() { // Start reading in own goroutine, return on error
+ if err := fsock.FS.ReadEvents(); err != nil {
+ errChan <- err
+ }
+ }()
}
- if err := fsock.FS.ReadEvents(); err != nil {
- return err
- }
- return errors.New(" - Stopped reading events")
+ err := <-errChan // Will keep the Connect locked until the first error in one of the connections
+ return err
}
-func (sm *FSSessionManager) createHandlers() (handlers map[string][]func(string)) {
- hb := func(body string) {
+func (sm *FSSessionManager) createHandlers() (handlers map[string][]func(string, string)) {
+ cp := func(body, connId string) {
ev := new(FSEvent).AsEvent(body)
- sm.OnHeartBeat(ev)
+ sm.onChannelPark(ev, connId)
}
- cp := func(body string) {
+ ca := func(body, connId string) {
ev := new(FSEvent).AsEvent(body)
- sm.OnChannelPark(ev)
+ sm.onChannelAnswer(ev, connId)
}
- ca := func(body string) {
+ ch := func(body, connId string) {
ev := new(FSEvent).AsEvent(body)
- sm.OnChannelAnswer(ev)
+ sm.onChannelHangupComplete(ev)
}
- ch := func(body string) {
- ev := new(FSEvent).AsEvent(body)
- sm.OnChannelHangupComplete(ev)
- }
- return map[string][]func(string){
- "HEARTBEAT": []func(string){hb},
- "CHANNEL_PARK": []func(string){cp},
- "CHANNEL_ANSWER": []func(string){ca},
- "CHANNEL_HANGUP_COMPLETE": []func(string){ch},
+ return map[string][]func(string, string){
+ "CHANNEL_PARK": []func(string, string){cp},
+ "CHANNEL_ANSWER": []func(string, string){ca},
+ "CHANNEL_HANGUP_COMPLETE": []func(string, string){ch},
}
}
@@ -102,25 +101,25 @@ func (sm *FSSessionManager) GetSession(uuid string) *Session {
}
// Disconnects a session by sending hangup command to freeswitch
-func (sm *FSSessionManager) DisconnectSession(ev utils.Event, notify string) {
- if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), notify)); err != nil {
- engine.Logger.Err(fmt.Sprintf(" Could not send disconect api notification to freeswitch: %s", err.Error()))
+func (sm *FSSessionManager) DisconnectSession(ev utils.Event, connId, notify string) {
+ if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), notify)); err != nil {
+ engine.Logger.Err(fmt.Sprintf(" Could not send disconect api notification to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
}
if notify == INSUFFICIENT_FUNDS {
- if len(cfg.FSEmptyBalanceContext) != 0 {
- if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_transfer %s %s %s\n\n", ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), cfg.FSEmptyBalanceContext)); err != nil {
- engine.Logger.Err(" Could not transfer the call to empty balance context")
+ if len(sm.cfg.EmptyBalanceContext) != 0 {
+ if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s %s\n\n", ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), sm.cfg.EmptyBalanceContext)); err != nil {
+ engine.Logger.Err(fmt.Sprintf(" Could not transfer the call to empty balance context, error: <%s>, connId: %s", err.Error(), connId))
}
return
- } else if len(cfg.FSEmptyBalanceAnnFile) != 0 {
- if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", ev.GetUUID(), cfg.FSEmptyBalanceAnnFile)); err != nil {
- engine.Logger.Err(fmt.Sprintf(" Could not send uuid_broadcast to freeswitch: %s", err.Error()))
+ } else if len(sm.cfg.EmptyBalanceAnnFile) != 0 {
+ if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", ev.GetUUID(), sm.cfg.EmptyBalanceAnnFile)); err != nil {
+ engine.Logger.Err(fmt.Sprintf(" Could not send uuid_broadcast to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
}
return
}
}
- if err := fsock.FS.SendMsgCmd(ev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil {
- engine.Logger.Err(fmt.Sprintf(" Could not send disconect msg to freeswitch: %v", err))
+ if err := sm.conns[connId].SendMsgCmd(ev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil {
+ engine.Logger.Err(fmt.Sprintf(" Could not send disconect msg to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
}
return
}
@@ -136,112 +135,54 @@ func (sm *FSSessionManager) RemoveSession(uuid string) {
}
// Sets the call timeout valid of starting of the call
-func (sm *FSSessionManager) setMaxCallDuration(uuid string, maxDur time.Duration) error {
+func (sm *FSSessionManager) setMaxCallDuration(uuid, connId string, maxDur time.Duration) error {
// _, err := fsock.FS.SendApiCmd(fmt.Sprintf("sched_hangup +%d %s\n\n", int(maxDur.Seconds()), uuid))
- _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s execute_on_answer sched_hangup +%d alloted_timeout\n\n", uuid, int(maxDur.Seconds())))
+ _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s execute_on_answer sched_hangup +%d alloted_timeout\n\n", uuid, int(maxDur.Seconds())))
if err != nil {
- engine.Logger.Err("could not send sched_hangup command to freeswitch")
+ engine.Logger.Err(fmt.Sprintf(" Could not send sched_hangup command to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
return err
}
return nil
}
// Sends the transfer command to unpark the call to freeswitch
-func (sm *FSSessionManager) unparkCall(uuid, call_dest_nb, notify string) {
- _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify))
+func (sm *FSSessionManager) unparkCall(uuid, connId, call_dest_nb, notify string) {
+ _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify))
if err != nil {
- engine.Logger.Err(" Could not send unpark api notification to freeswitch")
+ engine.Logger.Err(fmt.Sprintf(" Could not send unpark api notification to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
}
- if _, err = fsock.FS.SendApiCmd(fmt.Sprintf("uuid_transfer %s %s\n\n", uuid, call_dest_nb)); err != nil {
- engine.Logger.Err(" Could not send unpark api call to freeswitch")
+ if _, err = sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s\n\n", uuid, call_dest_nb)); err != nil {
+ engine.Logger.Err(fmt.Sprintf(" Could not send unpark api call to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
}
}
-func (sm *FSSessionManager) OnHeartBeat(ev utils.Event) {
- engine.Logger.Info("freeswitch ♥")
-}
-
-func (sm *FSSessionManager) OnChannelPark(ev utils.Event) {
- var maxCallDuration time.Duration // This will be the maximum duration this channel will be allowed to last
- var durInitialized bool
- attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT),
- Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
- var dcs utils.DerivedChargers
- if err := sm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil {
- engine.Logger.Err(fmt.Sprintf(" OnPark: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
- sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) // We unpark on original destination
- return
+func (sm *FSSessionManager) onChannelPark(ev utils.Event, connId string) {
+ var maxCallDuration float64 // This will be the maximum duration this channel will be allowed to last
+ if err := sm.rater.GetDerivedMaxSessionTime(ev, &maxCallDuration); err != nil {
+ engine.Logger.Err(fmt.Sprintf(" Could not get max session time for %s, error: %s", ev.GetUUID(), err.Error()))
}
- dcs, _ = dcs.AppendDefaultRun()
- for _, dc := range dcs {
- runFilters, _ := utils.ParseRSRFields(dc.RunFilters, utils.INFIELD_SEP)
- matchingAllFilters := true
- for _, dcRunFilter := range runFilters {
- if fltrPass, _ := ev.PassesFieldFilter(dcRunFilter); !fltrPass {
- matchingAllFilters = false
- break
- }
- }
- if !matchingAllFilters { // Do not process the derived charger further if not all filters were matched
- continue
- }
- startTime, err := ev.GetAnswerTime(PARK_TIME)
- if err != nil {
- engine.Logger.Err("Error parsing answer event start time, using time.Now!")
- startTime = time.Now()
- }
- if ev.MissingParameter() {
- sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(dc.DestinationField), MISSING_PARAMETER)
- engine.Logger.Err(fmt.Sprintf("Missing parameter for %s", ev.GetUUID()))
- return
- }
- cd := engine.CallDescriptor{
- Direction: ev.GetDirection(dc.DirectionField),
- Tenant: ev.GetTenant(dc.TenantField),
- Category: ev.GetCategory(dc.CategoryField),
- Subject: ev.GetSubject(dc.SubjectField),
- Account: ev.GetAccount(dc.AccountField),
- Destination: ev.GetDestination(dc.DestinationField),
- TimeStart: startTime,
- TimeEnd: startTime.Add(cfg.SMMaxCallDuration),
- }
- var remainingDurationFloat float64
- err = sm.rater.GetMaxSessionTime(cd, &remainingDurationFloat)
- if err != nil {
- engine.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err))
- sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(""), SYSTEM_ERROR) // We unpark on original destination
- return
- }
- remainingDuration := time.Duration(remainingDurationFloat)
- // Set maxCallDuration, smallest out of all forked sessions
- if !durInitialized { // first time we set it /not initialized yet
- maxCallDuration = remainingDuration
- durInitialized = true
- } else if maxCallDuration > remainingDuration {
- maxCallDuration = remainingDuration
- }
- }
- if maxCallDuration <= cfg.SMMinCallDuration {
+ maxCallDur := time.Duration(maxCallDuration)
+ if maxCallDur <= sm.cfg.MinCallDuration {
//engine.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey(cd.Subject)))
- sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), INSUFFICIENT_FUNDS)
+ sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), INSUFFICIENT_FUNDS)
return
}
- sm.setMaxCallDuration(ev.GetUUID(), maxCallDuration)
- sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), AUTH_OK)
+ sm.setMaxCallDuration(ev.GetUUID(), connId, maxCallDur)
+ sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), AUTH_OK)
}
-func (sm *FSSessionManager) OnChannelAnswer(ev utils.Event) {
+func (sm *FSSessionManager) onChannelAnswer(ev utils.Event, connId string) {
if ev.MissingParameter() {
- sm.DisconnectSession(ev, MISSING_PARAMETER)
+ sm.DisconnectSession(ev, connId, MISSING_PARAMETER)
}
- s := NewSession(ev, sm)
+ s := NewSession(ev, connId, sm)
if s != nil {
sm.sessions = append(sm.sessions, s)
}
}
-func (sm *FSSessionManager) OnChannelHangupComplete(ev utils.Event) {
- go sm.processCdr(ev.AsStoredCdr())
+func (sm *FSSessionManager) onChannelHangupComplete(ev utils.Event) {
+ go sm.ProcessCdr(ev.AsStoredCdr())
s := sm.GetSession(ev.GetUUID())
if s == nil { // Not handled by us
return
@@ -252,7 +193,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev utils.Event) {
}
}
-func (sm *FSSessionManager) processCdr(storedCdr *utils.StoredCdr) error {
+func (sm *FSSessionManager) ProcessCdr(storedCdr *utils.StoredCdr) error {
if sm.cdrs != nil {
var reply string
if err := sm.cdrs.ProcessCdr(storedCdr, &reply); err != nil {
@@ -263,15 +204,10 @@ func (sm *FSSessionManager) processCdr(storedCdr *utils.StoredCdr) error {
return nil
}
-func (sm *FSSessionManager) GetDebitPeriod() time.Duration {
- return sm.debitPeriod
+func (sm *FSSessionManager) DebitInterval() time.Duration {
+ return sm.cfg.DebitInterval
}
-
-func (sm *FSSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engine.CallCost) error {
- return sm.rater.MaxDebit(*cd, cc)
-}
-
-func (sm *FSSessionManager) GetDbLogger() engine.LogStorage {
+func (sm *FSSessionManager) DbLogger() engine.LogStorage {
return sm.loggerDB
}
@@ -279,17 +215,27 @@ func (sm *FSSessionManager) Rater() engine.Connector {
return sm.rater
}
-func (sm *FSSessionManager) Shutdown() (err error) {
- if fsock.FS == nil || !fsock.FS.Connected() {
- return errors.New("Cannot shutdown sessions, fsock not connected")
+// Called when call goes under the minimum duratio threshold, so FreeSWITCH can play an announcement message
+func (sm *FSSessionManager) WarnSessionMinDuration(sessionUuid, connId string) {
+ if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s %s aleg\n\n", sessionUuid, sm.cfg.LowBalanceAnnFile)); err != nil {
+ engine.Logger.Err(fmt.Sprintf(" Could not send uuid_broadcast to freeswitch, error: %s, connection id: %s", err.Error(), connId))
}
- engine.Logger.Info("Shutting down all sessions...")
- if _, err = fsock.FS.SendApiCmd("hupall MANAGER_REQUEST cgr_reqtype prepaid"); err != nil {
- engine.Logger.Err(fmt.Sprintf("Error on calls shutdown: %s", err))
+}
+
+func (sm *FSSessionManager) Shutdown() (err error) {
+ for connId, fSock := range sm.conns {
+ if !fSock.Connected() {
+ engine.Logger.Err(fmt.Sprintf(" Cannot shutdown sessions, fsock not connected for connection id: %s", connId))
+ continue
+ }
+ engine.Logger.Info(fmt.Sprintf(" Shutting down all sessions on connection id: %s", connId))
+ if _, err = fSock.SendApiCmd("hupall MANAGER_REQUEST cgr_reqtype prepaid"); err != nil {
+ engine.Logger.Err(fmt.Sprintf(" Error on calls shutdown: %s, connection id: %s", err.Error(), connId))
+ }
}
for guard := 0; len(sm.sessions) > 0 && guard < 20; guard++ {
time.Sleep(100 * time.Millisecond) // wait for the hungup event to be fired
- engine.Logger.Info(fmt.Sprintf(" Shutdown waiting on sessions: %v", sm.sessions))
+ engine.Logger.Info(fmt.Sprintf(" Shutdown waiting on sessions: %v", sm.sessions))
}
- return
+ return nil
}
diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go
index 301204059..14c91841c 100644
--- a/sessionmanager/kamailiosm.go
+++ b/sessionmanager/kamailiosm.go
@@ -75,10 +75,10 @@ func (self *KamailioSessionManager) onCallStart(evData []byte) {
engine.Logger.Err(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", evData, err.Error()))
}
if kamEv.MissingParameter() {
- self.DisconnectSession(kamEv, utils.ERR_MANDATORY_IE_MISSING)
+ self.DisconnectSession(kamEv, "", utils.ERR_MANDATORY_IE_MISSING)
return
}
- s := NewSession(kamEv, self)
+ s := NewSession(kamEv, "", self)
if s != nil {
self.sessions = append(self.sessions, s)
}
@@ -119,7 +119,7 @@ func (self *KamailioSessionManager) Connect() error {
return errors.New(" Stopped reading events")
}
-func (self *KamailioSessionManager) DisconnectSession(ev utils.Event, notify string) {
+func (self *KamailioSessionManager) DisconnectSession(ev utils.Event, connId, notify string) {
sessionIds := ev.GetSessionIds()
disconnectEv := &KamSessionDisconnect{Event: CGR_SESSION_DISCONNECT, HashEntry: sessionIds[0], HashId: sessionIds[1], Reason: notify}
if err := self.kea.Send(disconnectEv.String()); err != nil {
@@ -149,24 +149,28 @@ func (self *KamailioSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engi
return self.rater.MaxDebit(*cd, cc)
}
-func (self *KamailioSessionManager) GetDebitPeriod() time.Duration {
+func (self *KamailioSessionManager) DebitInterval() time.Duration {
return self.debitInterval
}
-func (self *KamailioSessionManager) GetDbLogger() engine.LogStorage {
+func (self *KamailioSessionManager) DbLogger() engine.LogStorage {
return self.loggerDb
}
func (self *KamailioSessionManager) Rater() engine.Connector {
return self.rater
}
-func (self *KamailioSessionManager) ProcessCdr(cdr *utils.StoredCdr) {
+func (self *KamailioSessionManager) ProcessCdr(cdr *utils.StoredCdr) error {
if self.cdrsrv == nil {
- return
+ return nil
}
var reply string
if err := self.cdrsrv.ProcessCdr(cdr, &reply); err != nil {
engine.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", cdr.CgrId, cdr.AccId, err.Error()))
}
+ return nil
+}
+
+func (sm *KamailioSessionManager) WarnSessionMinDuration(sessionUuid, connId string) {
}
func (self *KamailioSessionManager) Shutdown() error {
return nil
diff --git a/sessionmanager/osipsevent_test.go b/sessionmanager/osipsevent_test.go
index a6a5063e9..63e07eec0 100644
--- a/sessionmanager/osipsevent_test.go
+++ b/sessionmanager/osipsevent_test.go
@@ -68,7 +68,7 @@ func TestOsipsEventParseStatic(t *testing.T) {
}
func TestOsipsEventGetValues(t *testing.T) {
- cfg, _ = config.NewDefaultCGRConfig()
+ cfg, _ := config.NewDefaultCGRConfig()
config.SetCgrConfig(cfg)
setupTime, _ := osipsEv.GetSetupTime(utils.META_DEFAULT)
eSetupTime, _ := utils.ParseTimeDetectLayout("1406370492")
diff --git a/sessionmanager/osipssm.go b/sessionmanager/osipssm.go
index bfed586c0..7550f558d 100644
--- a/sessionmanager/osipssm.go
+++ b/sessionmanager/osipssm.go
@@ -35,7 +35,7 @@ func NewOSipsSessionManager(cfg *config.CGRConfig, rater, cdrsrv engine.Connecto
osm := &OsipsSessionManager{cgrCfg: cfg, rater: rater, cdrsrv: cdrsrv}
osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){
"E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.OnOpensipsStart},
- "E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.OnCdr},
+ "E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.onCdr},
"E_CGR_AUTHORIZE": []func(*osipsdagram.OsipsEvent){osm.OnAuthorize},
}
return osm, nil
@@ -70,7 +70,7 @@ func (osm *OsipsSessionManager) Connect() (err error) {
return errors.New(" Stopped reading events")
}
-func (osm *OsipsSessionManager) DisconnectSession(ev utils.Event, notify string) {
+func (osm *OsipsSessionManager) DisconnectSession(ev utils.Event, cgrId, notify string) {
return
}
func (osm *OsipsSessionManager) RemoveSession(uuid string) {
@@ -79,15 +79,18 @@ func (osm *OsipsSessionManager) RemoveSession(uuid string) {
func (osm *OsipsSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engine.CallCost) error {
return nil
}
-func (osm *OsipsSessionManager) GetDebitPeriod() time.Duration {
+func (osm *OsipsSessionManager) DebitInterval() time.Duration {
var nilDuration time.Duration
return nilDuration
}
-func (osm *OsipsSessionManager) GetDbLogger() engine.LogStorage {
+func (osm *OsipsSessionManager) DbLogger() engine.LogStorage {
return nil
}
-func (self *OsipsSessionManager) Rater() engine.Connector {
- return self.rater
+func (osm *OsipsSessionManager) Rater() engine.Connector {
+ return osm.rater
+}
+func (osm *OsipsSessionManager) WarnSessionMinDuration(sessionUuid, connId string) {
+ return
}
func (osm *OsipsSessionManager) Shutdown() error {
return nil
@@ -143,13 +146,17 @@ func (osm *OsipsSessionManager) OnOpensipsStart(cdrDagram *osipsdagram.OsipsEven
go osm.SubscribeEvents(evStop)
}
-func (osm *OsipsSessionManager) OnCdr(cdrDagram *osipsdagram.OsipsEvent) {
- var reply string
+func (osm *OsipsSessionManager) onCdr(cdrDagram *osipsdagram.OsipsEvent) {
osipsEv, _ := NewOsipsEvent(cdrDagram)
- storedCdr := osipsEv.AsStoredCdr()
- if err := osm.cdrsrv.ProcessCdr(storedCdr, &reply); err != nil {
- engine.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", storedCdr.CgrId, storedCdr.AccId, err.Error()))
+ if err := osm.ProcessCdr(osipsEv.AsStoredCdr()); err != nil {
+ engine.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", osipsEv.GetCgrId(), osipsEv.GetUUID(), err.Error()))
}
+
+}
+
+func (osm *OsipsSessionManager) ProcessCdr(storedCdr *utils.StoredCdr) error {
+ var reply string
+ return osm.cdrsrv.ProcessCdr(storedCdr, &reply)
}
// Process Authorize request from OpenSIPS and communicate back maxdur
diff --git a/sessionmanager/session.go b/sessionmanager/session.go
index 10981c30d..12f782c0c 100644
--- a/sessionmanager/session.go
+++ b/sessionmanager/session.go
@@ -25,7 +25,6 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
- "github.com/cgrates/fsock"
)
// Session type holding the call information fields, a session delegate for specific
@@ -34,6 +33,8 @@ type Session struct {
eventStart utils.Event // Store the original event who started this session so we can use it's info later (eg: disconnect, cgrid)
stopDebit chan bool // Channel to communicate with debit loops when closing the session
sessionManager SessionManager
+ connId string // Reference towards connection id on the session manager side.
+ warnMinDur time.Duration
sessionRuns []*engine.SessionRun
}
@@ -51,10 +52,11 @@ func (s *Session) SessionRuns() []*engine.SessionRun {
}
// Creates a new session and in case of prepaid starts the debit loop for each of the session runs individually
-func NewSession(ev utils.Event, sm SessionManager) *Session {
+func NewSession(ev utils.Event, connId string, sm SessionManager) *Session {
s := &Session{eventStart: ev,
stopDebit: make(chan bool),
sessionManager: sm,
+ connId: connId,
}
if err := sm.Rater().GetSessionRuns(ev, &s.sessionRuns); err != nil || len(s.sessionRuns) == 0 {
return nil
@@ -69,7 +71,7 @@ func NewSession(ev utils.Event, sm SessionManager) *Session {
func (s *Session) debitLoop(runIdx int) {
nextCd := *s.sessionRuns[runIdx].CallDescriptor
index := 0.0
- debitPeriod := s.sessionManager.GetDebitPeriod()
+ debitPeriod := s.sessionManager.DebitInterval()
for {
select {
case <-s.stopDebit:
@@ -83,19 +85,17 @@ func (s *Session) debitLoop(runIdx int) {
nextCd.LoopIndex = index
nextCd.DurationIndex += debitPeriod // first presumed duration
cc := new(engine.CallCost)
- if err := s.sessionManager.MaxDebit(&nextCd, cc); err != nil {
+ if err := s.sessionManager.Rater().MaxDebit(nextCd, cc); err != nil {
engine.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err))
- s.sessionManager.DisconnectSession(s.eventStart, SYSTEM_ERROR)
+ s.sessionManager.DisconnectSession(s.eventStart, s.connId, SYSTEM_ERROR)
return
}
if cc.GetDuration() == 0 {
- s.sessionManager.DisconnectSession(s.eventStart, INSUFFICIENT_FUNDS)
+ s.sessionManager.DisconnectSession(s.eventStart, s.connId, INSUFFICIENT_FUNDS)
return
}
- if cc.GetDuration() <= cfg.FSMinDurLowBalance && len(cfg.FSLowBalanceAnnFile) != 0 {
- if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_broadcast %s %s aleg\n\n", s.eventStart.GetUUID(), cfg.FSLowBalanceAnnFile)); err != nil {
- engine.Logger.Err(fmt.Sprintf(" Could not send uuid_broadcast to freeswitch: %s", err.Error()))
- }
+ if s.warnMinDur != time.Duration(0) && cc.GetDuration() <= s.warnMinDur {
+ s.sessionManager.WarnSessionMinDuration(s.eventStart.GetUUID(), s.connId)
}
s.sessionRuns[runIdx].CallCosts = append(s.sessionRuns[runIdx].CallCosts, cc)
nextCd.TimeEnd = cc.GetEndTime() // set debited timeEnd
@@ -204,9 +204,6 @@ func (s *Session) SaveOperations() {
for _, cc := range sr.CallCosts[1:] {
firstCC.Merge(cc)
}
- if s.sessionManager.GetDbLogger() == nil {
- engine.Logger.Err(" Error: no connection to logger database, cannot save costs")
- }
- s.sessionManager.GetDbLogger().LogCallCost(s.eventStart.GetCgrId(), engine.SESSION_MANAGER_SOURCE, sr.DerivedCharger.RunId, firstCC)
+ s.sessionManager.DbLogger().LogCallCost(s.eventStart.GetCgrId(), engine.SESSION_MANAGER_SOURCE, sr.DerivedCharger.RunId, firstCC)
}
}
diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go
index 34ad293db..95f9d9462 100644
--- a/sessionmanager/sessionmanager.go
+++ b/sessionmanager/sessionmanager.go
@@ -26,12 +26,13 @@ import (
)
type SessionManager interface {
- Connect() error
- DisconnectSession(utils.Event, string)
- RemoveSession(string)
- MaxDebit(*engine.CallDescriptor, *engine.CallCost) error
- GetDebitPeriod() time.Duration
- GetDbLogger() engine.LogStorage
+ DbLogger() engine.LogStorage
Rater() engine.Connector
+ DebitInterval() time.Duration
+ Connect() error
+ DisconnectSession(utils.Event, string, string)
+ WarnSessionMinDuration(string, string)
+ RemoveSession(string)
+ ProcessCdr(*utils.StoredCdr) error
Shutdown() error
}