mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-17 06:09:53 +05:00
added code documentation
This commit is contained in:
@@ -88,9 +88,7 @@ func main() {
|
||||
go listenToJsonRPCRequests()
|
||||
|
||||
sm := &sessionmanager.SessionManager{}
|
||||
sm.Connect(*freeswitchsrv, *freeswitchpass)
|
||||
sm.SetSessionDelegate(new(sessionmanager.DirectSessionDelegate))
|
||||
sm.StartEventLoop()
|
||||
sm.Connect(new(sessionmanager.DirectSessionDelegate), *freeswitchsrv, *freeswitchpass)
|
||||
|
||||
listenToHttpRequests()
|
||||
}
|
||||
|
||||
@@ -153,9 +153,7 @@ func main() {
|
||||
}
|
||||
if *standalone {
|
||||
sm := &sessionmanager.SessionManager{}
|
||||
sm.Connect(*freeswitchsrv, *freeswitchpass)
|
||||
sm.SetSessionDelegate(new(sessionmanager.DirectSessionDelegate))
|
||||
sm.StartEventLoop()
|
||||
sm.Connect(new(sessionmanager.DirectSessionDelegate), *freeswitchsrv, *freeswitchpass)
|
||||
} else {
|
||||
go RegisterToServer(balancer, listen)
|
||||
go StopSingnalHandler(balancer, listen, getter)
|
||||
|
||||
@@ -24,15 +24,17 @@ import (
|
||||
"regexp"
|
||||
)
|
||||
|
||||
// Event type holding a mapping of all event's proprieties
|
||||
type Event struct {
|
||||
Fields map[string]string
|
||||
}
|
||||
|
||||
var (
|
||||
eventBodyRE = regexp.MustCompile(`"(.*?)":\s+"(.*?)"`)
|
||||
eventBodyRE = regexp.MustCompile(`"(.*?)":\s+"(.*?)"`) // for parsing the proprieties
|
||||
)
|
||||
|
||||
const (
|
||||
// Freswitch event proprities names
|
||||
CALL_DIRECTION = "Call-Direction"
|
||||
SUBJECT = "variable_sip_full_from"
|
||||
DESTINATION = "variable_sip_full_to"
|
||||
@@ -41,8 +43,8 @@ const (
|
||||
START_TIME = "Event-Date-GMT"
|
||||
)
|
||||
|
||||
//eventBodyRE *regexp.Regexp
|
||||
|
||||
// Creates a new event from a bod of text containing the key value proprieties.
|
||||
// It stores the parsed proprieties in the internal map.
|
||||
func NewEvent(body string) (ev *Event) {
|
||||
ev = &Event{Fields: make(map[string]string)}
|
||||
for _, fields := range eventBodyRE.FindAllStringSubmatch(body, -1) {
|
||||
@@ -55,6 +57,7 @@ func NewEvent(body string) (ev *Event) {
|
||||
return
|
||||
}
|
||||
|
||||
// Nice printing for the event object.
|
||||
func (ev *Event) String() (result string) {
|
||||
for k, v := range ev.Fields {
|
||||
result += fmt.Sprintf("%s = %s\n", k, v)
|
||||
|
||||
@@ -29,6 +29,8 @@ const (
|
||||
DEBIT_PERIOD = 10 * time.Second
|
||||
)
|
||||
|
||||
// Session type holding the call information fields, a session delegate for specific
|
||||
// actions and a channel to signal end of the debit loop.
|
||||
type Session struct {
|
||||
uuid, cstmId, subject, destination string
|
||||
startTime time.Time // destination: startTime
|
||||
@@ -36,6 +38,7 @@ type Session struct {
|
||||
stopDebit chan byte
|
||||
}
|
||||
|
||||
// Creates a new session and starts the debit loop
|
||||
func NewSession(ev *Event, ed SessionDelegate) (s *Session) {
|
||||
startTime, err := time.Parse(time.RFC1123, ev.Fields[START_TIME])
|
||||
if err != nil {
|
||||
@@ -49,11 +52,12 @@ func NewSession(ev *Event, ed SessionDelegate) (s *Session) {
|
||||
startTime: startTime,
|
||||
stopDebit: make(chan byte)}
|
||||
s.sessionDelegate = ed
|
||||
go s.StartDebitLoop()
|
||||
go s.startDebitLoop()
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Session) StartDebitLoop() {
|
||||
// the debit loop method (to be stoped by sending somenting on stopDebit channel)
|
||||
func (s *Session) startDebitLoop() {
|
||||
for {
|
||||
select {
|
||||
case <-s.stopDebit:
|
||||
@@ -65,7 +69,8 @@ func (s *Session) StartDebitLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) GetSessionDurationFrom(now time.Time) (d time.Duration) {
|
||||
// Returns the session duration till the specified time
|
||||
func (s *Session) getSessionDurationFrom(now time.Time) (d time.Duration) {
|
||||
seconds := now.Sub(s.startTime).Seconds()
|
||||
d, err := time.ParseDuration(fmt.Sprintf("%ds", int(seconds)))
|
||||
if err != nil {
|
||||
@@ -74,13 +79,15 @@ func (s *Session) GetSessionDurationFrom(now time.Time) (d time.Duration) {
|
||||
return
|
||||
}
|
||||
|
||||
// Returns the session duration till now
|
||||
func (s *Session) GetSessionDuration() time.Duration {
|
||||
return s.GetSessionDurationFrom(time.Now())
|
||||
return s.getSessionDurationFrom(time.Now())
|
||||
}
|
||||
|
||||
func (s *Session) GetSessionCostFrom(now time.Time) (callCosts *timespans.CallCost, err error) {
|
||||
// Returns the session cost till the specified time
|
||||
func (s *Session) getSessionCostFrom(now time.Time) (callCosts *timespans.CallCost, err error) {
|
||||
cd := ×pans.CallDescriptor{TOR: 1, CstmId: s.cstmId, Subject: s.subject, DestinationPrefix: s.destination, TimeStart: s.startTime, TimeEnd: now}
|
||||
cd.SetStorageGetter(storageGetter)
|
||||
cd.SetStorageGetter(s.sessionDelegate.GetStorageGetter())
|
||||
callCosts, err = cd.GetCost()
|
||||
if err != nil {
|
||||
log.Printf("Error getting call cost for session %v", s)
|
||||
@@ -88,10 +95,12 @@ func (s *Session) GetSessionCostFrom(now time.Time) (callCosts *timespans.CallCo
|
||||
return
|
||||
}
|
||||
|
||||
// Returns the session duration till now
|
||||
func (s *Session) GetSessionCost() (callCosts *timespans.CallCost, err error) {
|
||||
return s.GetSessionCostFrom(time.Now())
|
||||
return s.getSessionCostFrom(time.Now())
|
||||
}
|
||||
|
||||
// Stops the debit loop
|
||||
func (s *Session) Close() {
|
||||
s.stopDebit <- 1
|
||||
}
|
||||
|
||||
@@ -55,7 +55,7 @@ func TestSessionDurationSingle(t *testing.T) {
|
||||
s := NewSession(newEvent, new(DirectSessionDelegate))
|
||||
defer s.Close()
|
||||
twoSeconds, _ := time.ParseDuration("2s")
|
||||
if d := s.GetSessionDurationFrom(s.startTime.Add(twoSeconds)); d.Seconds() < 2 || d.Seconds() > 3 {
|
||||
if d := s.getSessionDurationFrom(s.startTime.Add(twoSeconds)); d.Seconds() < 2 || d.Seconds() > 3 {
|
||||
t.Errorf("Wrong session duration %v", d)
|
||||
}
|
||||
}
|
||||
@@ -64,7 +64,7 @@ func TestSessionCostSingle(t *testing.T) {
|
||||
s := NewSession(newEvent, new(DirectSessionDelegate))
|
||||
defer s.Close()
|
||||
twoSeconds, _ := time.ParseDuration("60s")
|
||||
if cc, err := s.GetSessionCostFrom(s.startTime.Add(twoSeconds)); err != nil {
|
||||
if cc, err := s.getSessionCostFrom(s.startTime.Add(twoSeconds)); err != nil {
|
||||
t.Errorf("Get cost returned error %v", err)
|
||||
} else {
|
||||
if cc.Cost < 1 || cc.Cost > 1.1 {
|
||||
|
||||
@@ -19,16 +19,30 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package sessionmanager
|
||||
|
||||
import (
|
||||
"github.com/rif/cgrates/timespans"
|
||||
"log"
|
||||
)
|
||||
|
||||
var (
|
||||
// sample storage for the provided direct implementation
|
||||
storageGetter, _ = timespans.NewRedisStorage("tcp:127.0.0.1:6379", 10)
|
||||
)
|
||||
|
||||
// Interface for the session delegate objects
|
||||
type SessionDelegate interface {
|
||||
// Called on freeswitch's hearbeat event
|
||||
OnHeartBeat(*Event)
|
||||
// Called on freeswitch's answer event
|
||||
OnChannelAnswer(*Event, *Session)
|
||||
// Called on freeswitch's hangup event
|
||||
OnChannelHangupComplete(*Event, *Session)
|
||||
// The method to be called inside the debit loop
|
||||
LoopAction()
|
||||
// Returns a storage getter for the sesssion to use
|
||||
GetStorageGetter() timespans.StorageGetter
|
||||
}
|
||||
|
||||
// Sample SessionDelegate calling the timespans methods directly
|
||||
type DirectSessionDelegate byte
|
||||
|
||||
func (dsd *DirectSessionDelegate) OnHeartBeat(ev *Event) {
|
||||
@@ -47,7 +61,11 @@ func (dsd *DirectSessionDelegate) LoopAction() {
|
||||
log.Print("Direct debit")
|
||||
}
|
||||
|
||||
//
|
||||
func (dsd *DirectSessionDelegate) GetStorageGetter() timespans.StorageGetter {
|
||||
return storageGetter
|
||||
}
|
||||
|
||||
// Sample SessionDelegate calling the timespans methods through the RPC interface
|
||||
type RPCSessionDelegate byte
|
||||
|
||||
func (rsd *RPCSessionDelegate) OnHeartBeat(ev *Event) {
|
||||
@@ -65,3 +83,7 @@ func (rsd *RPCSessionDelegate) OnChannelHangupComplete(ev *Event, s *Session) {
|
||||
func (rsd *RPCSessionDelegate) LoopAction() {
|
||||
log.Print("Rpc debit")
|
||||
}
|
||||
|
||||
func (rsd *RPCSessionDelegate) GetStorageGetter() timespans.StorageGetter {
|
||||
return storageGetter
|
||||
}
|
||||
|
||||
@@ -21,22 +21,25 @@ package sessionmanager
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"github.com/rif/cgrates/timespans"
|
||||
"log"
|
||||
"net"
|
||||
)
|
||||
|
||||
var (
|
||||
storageGetter, _ = timespans.NewRedisStorage("tcp:127.0.0.1:6379", 10)
|
||||
)
|
||||
|
||||
// The freeswitch session manager type holding a buffer for the network connection,
|
||||
// the active sessions, and a session delegate doing specific actions on every session.
|
||||
type SessionManager struct {
|
||||
buf *bufio.Reader
|
||||
sessions []*Session
|
||||
sessionDelegate SessionDelegate
|
||||
}
|
||||
|
||||
func (sm *SessionManager) Connect(address, pass string) {
|
||||
// Connects to the freeswitch mod_event_socket server and starts
|
||||
// listening for events in json format.
|
||||
func (sm *SessionManager) Connect(ed SessionDelegate, address, pass string) {
|
||||
if ed == nil {
|
||||
log.Fatal("Please provide a non nil SessionDelegate")
|
||||
}
|
||||
sm.sessionDelegate = ed
|
||||
conn, err := net.Dial("tcp", address)
|
||||
if err != nil {
|
||||
log.Fatal("Could not connect to freeswitch server!")
|
||||
@@ -44,21 +47,17 @@ func (sm *SessionManager) Connect(address, pass string) {
|
||||
sm.buf = bufio.NewReaderSize(conn, 8192)
|
||||
fmt.Fprint(conn, fmt.Sprintf("auth %s\n\n", pass))
|
||||
fmt.Fprint(conn, "event json all\n\n")
|
||||
}
|
||||
|
||||
func (sm *SessionManager) StartEventLoop() {
|
||||
go func() {
|
||||
for {
|
||||
sm.ReadNextEvent()
|
||||
sm.readNextEvent()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (sm *SessionManager) SetSessionDelegate(ed SessionDelegate) {
|
||||
sm.sessionDelegate = ed
|
||||
}
|
||||
|
||||
func (sm *SessionManager) ReadNextEvent() (ev *Event) {
|
||||
// Reads from freeswitch server buffer until it encounters a '}',
|
||||
// than it creates an event object and calls the appropriate method
|
||||
// on the session delegate.
|
||||
func (sm *SessionManager) readNextEvent() (ev *Event) {
|
||||
body, err := sm.buf.ReadString('}')
|
||||
if err != nil {
|
||||
log.Print("Could not read from freeswitch connection!")
|
||||
@@ -77,6 +76,7 @@ func (sm *SessionManager) ReadNextEvent() (ev *Event) {
|
||||
return
|
||||
}
|
||||
|
||||
// Searches and return the session with the specifed uuid
|
||||
func (sm *SessionManager) GetSession(uuid string) *Session {
|
||||
for _, s := range sm.sessions {
|
||||
if s.uuid == uuid {
|
||||
@@ -86,6 +86,7 @@ func (sm *SessionManager) GetSession(uuid string) *Session {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Called on freeswitch's hearbeat event
|
||||
func (sm *SessionManager) OnHeartBeat(ev *Event) {
|
||||
if sm.sessionDelegate != nil {
|
||||
sm.sessionDelegate.OnHeartBeat(ev)
|
||||
@@ -94,6 +95,7 @@ func (sm *SessionManager) OnHeartBeat(ev *Event) {
|
||||
}
|
||||
}
|
||||
|
||||
// Called on freeswitch's answer event
|
||||
func (sm *SessionManager) OnChannelAnswer(ev *Event) {
|
||||
if sm.sessionDelegate != nil {
|
||||
s := NewSession(ev, sm.sessionDelegate)
|
||||
@@ -103,6 +105,7 @@ func (sm *SessionManager) OnChannelAnswer(ev *Event) {
|
||||
}
|
||||
}
|
||||
|
||||
// Called on freeswitch's hangup event
|
||||
func (sm *SessionManager) OnChannelHangupComplete(ev *Event) {
|
||||
s := sm.GetSession(ev.Fields[UUID])
|
||||
if sm.sessionDelegate != nil {
|
||||
@@ -113,6 +116,8 @@ func (sm *SessionManager) OnChannelHangupComplete(ev *Event) {
|
||||
s.Close()
|
||||
}
|
||||
|
||||
// Called on freeswitch's events not processed by the session manger,
|
||||
// for logging purposes (maybe).
|
||||
func (sm *SessionManager) OnOther(ev *Event) {
|
||||
//log.Printf("Other event: %s", ev.Fields["Event-Name"])
|
||||
}
|
||||
|
||||
@@ -25,10 +25,9 @@ import (
|
||||
|
||||
func TestConnect(t *testing.T) {
|
||||
sm := &SessionManager{}
|
||||
sm.Connect("localhost:8021", "ClueCon")
|
||||
sm.SetSessionDelegate(new(DirectSessionDelegate))
|
||||
sm.Connect(new(DirectSessionDelegate), "localhost:8021", "ClueCon")
|
||||
//for {
|
||||
ev := sm.ReadNextEvent()
|
||||
ev := sm.readNextEvent()
|
||||
if ev == nil {
|
||||
t.Error("Got nil event!")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user