diff --git a/cmd/cgr-balancer/cgr-balanncer.go b/cmd/cgr-balancer/cgr-balanncer.go index 317a6430d..6b388403d 100644 --- a/cmd/cgr-balancer/cgr-balanncer.go +++ b/cmd/cgr-balancer/cgr-balanncer.go @@ -88,9 +88,7 @@ func main() { go listenToJsonRPCRequests() sm := &sessionmanager.SessionManager{} - sm.Connect(*freeswitchsrv, *freeswitchpass) - sm.SetSessionDelegate(new(sessionmanager.DirectSessionDelegate)) - sm.StartEventLoop() + sm.Connect(new(sessionmanager.DirectSessionDelegate), *freeswitchsrv, *freeswitchpass) listenToHttpRequests() } diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index 9ab0808d0..a7aebd7a5 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -153,9 +153,7 @@ func main() { } if *standalone { sm := &sessionmanager.SessionManager{} - sm.Connect(*freeswitchsrv, *freeswitchpass) - sm.SetSessionDelegate(new(sessionmanager.DirectSessionDelegate)) - sm.StartEventLoop() + sm.Connect(new(sessionmanager.DirectSessionDelegate), *freeswitchsrv, *freeswitchpass) } else { go RegisterToServer(balancer, listen) go StopSingnalHandler(balancer, listen, getter) diff --git a/sessionmanager/event.go b/sessionmanager/event.go index 9e0137e05..cf2a88404 100644 --- a/sessionmanager/event.go +++ b/sessionmanager/event.go @@ -24,15 +24,17 @@ import ( "regexp" ) +// Event type holding a mapping of all event's proprieties type Event struct { Fields map[string]string } var ( - eventBodyRE = regexp.MustCompile(`"(.*?)":\s+"(.*?)"`) + eventBodyRE = regexp.MustCompile(`"(.*?)":\s+"(.*?)"`) // for parsing the proprieties ) const ( + // Freswitch event proprities names CALL_DIRECTION = "Call-Direction" SUBJECT = "variable_sip_full_from" DESTINATION = "variable_sip_full_to" @@ -41,8 +43,8 @@ const ( START_TIME = "Event-Date-GMT" ) -//eventBodyRE *regexp.Regexp - +// Creates a new event from a bod of text containing the key value proprieties. +// It stores the parsed proprieties in the internal map. func NewEvent(body string) (ev *Event) { ev = &Event{Fields: make(map[string]string)} for _, fields := range eventBodyRE.FindAllStringSubmatch(body, -1) { @@ -55,6 +57,7 @@ func NewEvent(body string) (ev *Event) { return } +// Nice printing for the event object. func (ev *Event) String() (result string) { for k, v := range ev.Fields { result += fmt.Sprintf("%s = %s\n", k, v) diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 91c1518af..324e0ab7c 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -29,6 +29,8 @@ const ( DEBIT_PERIOD = 10 * time.Second ) +// Session type holding the call information fields, a session delegate for specific +// actions and a channel to signal end of the debit loop. type Session struct { uuid, cstmId, subject, destination string startTime time.Time // destination: startTime @@ -36,6 +38,7 @@ type Session struct { stopDebit chan byte } +// Creates a new session and starts the debit loop func NewSession(ev *Event, ed SessionDelegate) (s *Session) { startTime, err := time.Parse(time.RFC1123, ev.Fields[START_TIME]) if err != nil { @@ -49,11 +52,12 @@ func NewSession(ev *Event, ed SessionDelegate) (s *Session) { startTime: startTime, stopDebit: make(chan byte)} s.sessionDelegate = ed - go s.StartDebitLoop() + go s.startDebitLoop() return } -func (s *Session) StartDebitLoop() { +// the debit loop method (to be stoped by sending somenting on stopDebit channel) +func (s *Session) startDebitLoop() { for { select { case <-s.stopDebit: @@ -65,7 +69,8 @@ func (s *Session) StartDebitLoop() { } } -func (s *Session) GetSessionDurationFrom(now time.Time) (d time.Duration) { +// Returns the session duration till the specified time +func (s *Session) getSessionDurationFrom(now time.Time) (d time.Duration) { seconds := now.Sub(s.startTime).Seconds() d, err := time.ParseDuration(fmt.Sprintf("%ds", int(seconds))) if err != nil { @@ -74,13 +79,15 @@ func (s *Session) GetSessionDurationFrom(now time.Time) (d time.Duration) { return } +// Returns the session duration till now func (s *Session) GetSessionDuration() time.Duration { - return s.GetSessionDurationFrom(time.Now()) + return s.getSessionDurationFrom(time.Now()) } -func (s *Session) GetSessionCostFrom(now time.Time) (callCosts *timespans.CallCost, err error) { +// Returns the session cost till the specified time +func (s *Session) getSessionCostFrom(now time.Time) (callCosts *timespans.CallCost, err error) { cd := ×pans.CallDescriptor{TOR: 1, CstmId: s.cstmId, Subject: s.subject, DestinationPrefix: s.destination, TimeStart: s.startTime, TimeEnd: now} - cd.SetStorageGetter(storageGetter) + cd.SetStorageGetter(s.sessionDelegate.GetStorageGetter()) callCosts, err = cd.GetCost() if err != nil { log.Printf("Error getting call cost for session %v", s) @@ -88,10 +95,12 @@ func (s *Session) GetSessionCostFrom(now time.Time) (callCosts *timespans.CallCo return } +// Returns the session duration till now func (s *Session) GetSessionCost() (callCosts *timespans.CallCost, err error) { - return s.GetSessionCostFrom(time.Now()) + return s.getSessionCostFrom(time.Now()) } +// Stops the debit loop func (s *Session) Close() { s.stopDebit <- 1 } diff --git a/sessionmanager/session_test.go b/sessionmanager/session_test.go index 572ff0c72..e342fde6e 100644 --- a/sessionmanager/session_test.go +++ b/sessionmanager/session_test.go @@ -55,7 +55,7 @@ func TestSessionDurationSingle(t *testing.T) { s := NewSession(newEvent, new(DirectSessionDelegate)) defer s.Close() twoSeconds, _ := time.ParseDuration("2s") - if d := s.GetSessionDurationFrom(s.startTime.Add(twoSeconds)); d.Seconds() < 2 || d.Seconds() > 3 { + if d := s.getSessionDurationFrom(s.startTime.Add(twoSeconds)); d.Seconds() < 2 || d.Seconds() > 3 { t.Errorf("Wrong session duration %v", d) } } @@ -64,7 +64,7 @@ func TestSessionCostSingle(t *testing.T) { s := NewSession(newEvent, new(DirectSessionDelegate)) defer s.Close() twoSeconds, _ := time.ParseDuration("60s") - if cc, err := s.GetSessionCostFrom(s.startTime.Add(twoSeconds)); err != nil { + if cc, err := s.getSessionCostFrom(s.startTime.Add(twoSeconds)); err != nil { t.Errorf("Get cost returned error %v", err) } else { if cc.Cost < 1 || cc.Cost > 1.1 { diff --git a/sessionmanager/sessiondelegate.go b/sessionmanager/sessiondelegate.go index 308ee560b..586369bbc 100644 --- a/sessionmanager/sessiondelegate.go +++ b/sessionmanager/sessiondelegate.go @@ -19,16 +19,30 @@ along with this program. If not, see package sessionmanager import ( + "github.com/rif/cgrates/timespans" "log" ) +var ( + // sample storage for the provided direct implementation + storageGetter, _ = timespans.NewRedisStorage("tcp:127.0.0.1:6379", 10) +) + +// Interface for the session delegate objects type SessionDelegate interface { + // Called on freeswitch's hearbeat event OnHeartBeat(*Event) + // Called on freeswitch's answer event OnChannelAnswer(*Event, *Session) + // Called on freeswitch's hangup event OnChannelHangupComplete(*Event, *Session) + // The method to be called inside the debit loop LoopAction() + // Returns a storage getter for the sesssion to use + GetStorageGetter() timespans.StorageGetter } +// Sample SessionDelegate calling the timespans methods directly type DirectSessionDelegate byte func (dsd *DirectSessionDelegate) OnHeartBeat(ev *Event) { @@ -47,7 +61,11 @@ func (dsd *DirectSessionDelegate) LoopAction() { log.Print("Direct debit") } -// +func (dsd *DirectSessionDelegate) GetStorageGetter() timespans.StorageGetter { + return storageGetter +} + +// Sample SessionDelegate calling the timespans methods through the RPC interface type RPCSessionDelegate byte func (rsd *RPCSessionDelegate) OnHeartBeat(ev *Event) { @@ -65,3 +83,7 @@ func (rsd *RPCSessionDelegate) OnChannelHangupComplete(ev *Event, s *Session) { func (rsd *RPCSessionDelegate) LoopAction() { log.Print("Rpc debit") } + +func (rsd *RPCSessionDelegate) GetStorageGetter() timespans.StorageGetter { + return storageGetter +} diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index f6299501d..118ddb7ad 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -21,22 +21,25 @@ package sessionmanager import ( "bufio" "fmt" - "github.com/rif/cgrates/timespans" "log" "net" ) -var ( - storageGetter, _ = timespans.NewRedisStorage("tcp:127.0.0.1:6379", 10) -) - +// The freeswitch session manager type holding a buffer for the network connection, +// the active sessions, and a session delegate doing specific actions on every session. type SessionManager struct { buf *bufio.Reader sessions []*Session sessionDelegate SessionDelegate } -func (sm *SessionManager) Connect(address, pass string) { +// Connects to the freeswitch mod_event_socket server and starts +// listening for events in json format. +func (sm *SessionManager) Connect(ed SessionDelegate, address, pass string) { + if ed == nil { + log.Fatal("Please provide a non nil SessionDelegate") + } + sm.sessionDelegate = ed conn, err := net.Dial("tcp", address) if err != nil { log.Fatal("Could not connect to freeswitch server!") @@ -44,21 +47,17 @@ func (sm *SessionManager) Connect(address, pass string) { sm.buf = bufio.NewReaderSize(conn, 8192) fmt.Fprint(conn, fmt.Sprintf("auth %s\n\n", pass)) fmt.Fprint(conn, "event json all\n\n") -} - -func (sm *SessionManager) StartEventLoop() { go func() { for { - sm.ReadNextEvent() + sm.readNextEvent() } }() } -func (sm *SessionManager) SetSessionDelegate(ed SessionDelegate) { - sm.sessionDelegate = ed -} - -func (sm *SessionManager) ReadNextEvent() (ev *Event) { +// Reads from freeswitch server buffer until it encounters a '}', +// than it creates an event object and calls the appropriate method +// on the session delegate. +func (sm *SessionManager) readNextEvent() (ev *Event) { body, err := sm.buf.ReadString('}') if err != nil { log.Print("Could not read from freeswitch connection!") @@ -77,6 +76,7 @@ func (sm *SessionManager) ReadNextEvent() (ev *Event) { return } +// Searches and return the session with the specifed uuid func (sm *SessionManager) GetSession(uuid string) *Session { for _, s := range sm.sessions { if s.uuid == uuid { @@ -86,6 +86,7 @@ func (sm *SessionManager) GetSession(uuid string) *Session { return nil } +// Called on freeswitch's hearbeat event func (sm *SessionManager) OnHeartBeat(ev *Event) { if sm.sessionDelegate != nil { sm.sessionDelegate.OnHeartBeat(ev) @@ -94,6 +95,7 @@ func (sm *SessionManager) OnHeartBeat(ev *Event) { } } +// Called on freeswitch's answer event func (sm *SessionManager) OnChannelAnswer(ev *Event) { if sm.sessionDelegate != nil { s := NewSession(ev, sm.sessionDelegate) @@ -103,6 +105,7 @@ func (sm *SessionManager) OnChannelAnswer(ev *Event) { } } +// Called on freeswitch's hangup event func (sm *SessionManager) OnChannelHangupComplete(ev *Event) { s := sm.GetSession(ev.Fields[UUID]) if sm.sessionDelegate != nil { @@ -113,6 +116,8 @@ func (sm *SessionManager) OnChannelHangupComplete(ev *Event) { s.Close() } +// Called on freeswitch's events not processed by the session manger, +// for logging purposes (maybe). func (sm *SessionManager) OnOther(ev *Event) { //log.Printf("Other event: %s", ev.Fields["Event-Name"]) } diff --git a/sessionmanager/sessionmanager_test.go b/sessionmanager/sessionmanager_test.go index 1b598c985..04e136ea3 100644 --- a/sessionmanager/sessionmanager_test.go +++ b/sessionmanager/sessionmanager_test.go @@ -25,10 +25,9 @@ import ( func TestConnect(t *testing.T) { sm := &SessionManager{} - sm.Connect("localhost:8021", "ClueCon") - sm.SetSessionDelegate(new(DirectSessionDelegate)) + sm.Connect(new(DirectSessionDelegate), "localhost:8021", "ClueCon") //for { - ev := sm.ReadNextEvent() + ev := sm.readNextEvent() if ev == nil { t.Error("Got nil event!") }