SM-OpenSIPS with better session locking

This commit is contained in:
DanB
2015-11-02 10:58:33 +01:00
parent cb91c3b9ef
commit 9aa33a3231

View File

@@ -81,7 +81,7 @@ duration::
*/
func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, reconnects int, rater, cdrsrv engine.Connector, timezone string) (*OsipsSessionManager, error) {
osm := &OsipsSessionManager{cfg: smOsipsCfg, reconnects: reconnects, rater: rater, cdrsrv: cdrsrv, timezone: timezone, cdrStartEvents: make(map[string]*OsipsEvent)}
osm := &OsipsSessionManager{cfg: smOsipsCfg, reconnects: reconnects, rater: rater, cdrsrv: cdrsrv, timezone: timezone, cdrStartEvents: make(map[string]*OsipsEvent), sessions: NewSessions()}
osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){
"E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.onOpensipsStart}, // Raised when OpenSIPS starts so we can register our event handlers
"E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.onCdr}, // Raised if cdr_flag is configured
@@ -101,7 +101,7 @@ type OsipsSessionManager struct {
evSubscribeStop chan struct{} // Reference towards the channel controlling subscriptions, keep it as reference so we do not need to copy it
stopServing chan struct{} // Stop serving datagrams
miConn *osipsdagram.OsipsMiDatagramConnector // Pool of connections used to various OpenSIPS servers, keep reference towards events received so we can issue commands always to the same remote
sessions []*Session
sessions *Sessions
cdrStartEvents map[string]*OsipsEvent // Used when building CDRs, ToDo: secure access to map
}
@@ -124,16 +124,6 @@ func (osm *OsipsSessionManager) Connect() (err error) {
return errors.New("<SM-OpenSIPS> Stopped reading events")
}
// Removes a session on call end
func (osm *OsipsSessionManager) RemoveSession(uuid string) {
for i, ss := range osm.sessions {
if ss.eventStart.GetUUID() == uuid {
osm.sessions = append(osm.sessions[:i], osm.sessions[i+1:]...)
return
}
}
}
// DebitInterval will give out the frequence of the debits sent to engine
func (osm *OsipsSessionManager) DebitInterval() time.Duration {
return osm.cfg.DebitInterval
@@ -277,26 +267,25 @@ func (osm *OsipsSessionManager) callStart(osipsEv *OsipsEvent) error {
}
s := NewSession(osipsEv, "", osm)
if s != nil {
osm.sessions = append(osm.sessions, s)
osm.sessions.indexSession(s)
}
return nil
}
// Handler for callEnd. Mostly removes a session if needed
func (osm *OsipsSessionManager) callEnd(osipsEv *OsipsEvent) error {
s := osm.getSession(osipsEv.GetUUID())
s := osm.sessions.getSession(osipsEv.GetUUID())
if s == nil { // Not handled by us
return nil
}
osm.RemoveSession(s.eventStart.GetUUID()) // Unreference it early so we avoid concurrency
origEvent := s.eventStart.(*OsipsEvent) // Need a complete event for methods in close
origEvent := s.eventStart.(*OsipsEvent) // Need a complete event for methods in close
if err := origEvent.updateDurationFromEvent(osipsEv); err != nil {
return err
}
if origEvent.MissingParameter() {
return utils.ErrMandatoryIeMissing
}
if err := s.Close(origEvent); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database
if err := osm.sessions.removeSession(s, origEvent); err != nil { // Unreference it early so we avoid concurrency
return err
}
return nil
@@ -335,18 +324,8 @@ func (osm *OsipsSessionManager) processCdrStop(osipsEv *OsipsEvent) error {
return osm.ProcessCdr(osipsEvStart.AsStoredCdr(osm.timezone))
}
// Searches and return the session with the specifed uuid
func (osm *OsipsSessionManager) getSession(uuid string) *Session {
for _, s := range osm.sessions {
if s.eventStart.GetUUID() == uuid {
return s
}
}
return nil
}
func (osm *OsipsSessionManager) Sessions() []*Session {
return osm.sessions
return osm.sessions.getSessions()
}
// Sync sessions with FS