Adapting the code to the new fsock, some small fixups

This commit is contained in:
DanB
2013-03-28 14:46:00 +01:00
parent 48575f7752
commit 50f2bfc748
3 changed files with 0 additions and 3672 deletions

View File

@@ -1,328 +0,0 @@
/* Provides freeswitch socket communication
*/
package fsock
import (
"bufio"
"bytes"
"errors"
"fmt"
"log/syslog"
"net"
"net/url"
"sort"
"strconv"
"strings"
"time"
)
var fs *fSock // Used to share FS connection via package globals (singleton)
// Connection to FreeSWITCH Socket
type fSock struct {
conn net.Conn
buffer *bufio.Reader
fsaddress, fspaswd string
eventHandlers map[string]func(string)
eventFilters map[string]string
apiChan chan string
cmdChan chan string
reconnects int
delayFunc func() int
logger *syslog.Writer
}
// Connects to FS and starts buffering input
func New(fsaddr, fspaswd string, reconnects int, eventHandlers map[string]func(string), eventFilters map[string]string, l *syslog.Writer) error {
fs = &fSock{fsaddress: fsaddr, fspaswd: fspaswd, eventHandlers: eventHandlers, eventFilters: eventFilters, logger: l}
fs.apiChan = make(chan string) // Init apichan so we can use it to pass api replies
fs.reconnects = reconnects
fs.delayFunc = fib()
errConn := Connect(reconnects)
if errConn != nil {
return errConn
}
return nil
}
// Extracts value of a header from anywhere in content string
func headerVal(hdrs, hdr string) string {
var hdrSIdx, hdrEIdx int
if hdrSIdx = strings.Index(hdrs, hdr); hdrSIdx == -1 {
return ""
} else if hdrEIdx = strings.Index(hdrs[hdrSIdx:], "\n"); hdrEIdx == -1 {
hdrEIdx = len(hdrs[hdrSIdx:])
}
splt := strings.SplitN(hdrs[hdrSIdx:hdrSIdx+hdrEIdx], ": ", 2)
if len(splt) != 2 {
return ""
}
return strings.TrimSpace(strings.TrimRight(splt[1], "\n"))
}
// FS event header values are urlencoded. Use this to decode them. On error, use original value
func urlDecode(hdrVal string) string {
if valUnescaped, errUnescaping := url.QueryUnescape(hdrVal); errUnescaping == nil {
hdrVal = valUnescaped
}
return hdrVal
}
// Binary string search in slice
func isSliceMember(ss []string, s string) bool {
sort.Strings(ss)
if i := sort.SearchStrings(ss, s); i < len(ss) && ss[i] == s {
return true
}
return false
}
// Convert fseventStr into fseventMap
func FSEventStrToMap(fsevstr string, headers []string) map[string]string {
fsevent := make(map[string]string)
filtered := false
if len(headers) != 0 {
filtered = true
}
for _, strLn := range strings.Split(fsevstr, "\n") {
if hdrVal := strings.SplitN(strLn, ": ", 2); len(hdrVal) == 2 {
if filtered && isSliceMember(headers, hdrVal[0]) {
continue // Loop again since we only work on filtered fields
}
fsevent[hdrVal[0]] = urlDecode(strings.TrimSpace(strings.TrimRight(hdrVal[1], "\n")))
}
}
return fsevent
}
// Reads headers until delimiter reached
func readHeaders() (s string, err error) {
bytesRead := make([]byte, 0)
var readLine []byte
for {
readLine, err = fs.buffer.ReadBytes('\n')
if err != nil {
return
}
// No Error, add received to localread buffer
if len(bytes.TrimSpace(readLine)) == 0 {
break
}
bytesRead = append(bytesRead, readLine...)
}
return string(bytesRead), nil
}
// Reads the body from buffer, ln is given by content-length of headers
func readBody(ln int) (string, error) {
bytesRead := make([]byte, ln)
for i := 0; i < ln; i++ {
if readByte, err := fs.buffer.ReadByte(); err != nil {
return "", err
} else { // No Error, add received to localread buffer
bytesRead[i] = readByte // Add received line to the local read buffer
}
}
return string(bytesRead), nil
}
// Event is made out of headers and body (if present)
func readEvent() (string, string, error) {
var hdrs, body string
var cl int
var err error
if hdrs, err = readHeaders(); err != nil {
return "", "", err
}
if !strings.Contains(hdrs, "Content-Length") { //No body
return hdrs, "", nil
}
clStr := headerVal(hdrs, "Content-Length")
if cl, err = strconv.Atoi(clStr); err != nil {
return "", "", errors.New("Cannot extract content length")
}
if body, err = readBody(cl); err != nil {
return "", "", err
}
return hdrs, body, nil
}
// Checks if socket connected. Can be extended with pings
func Connected() bool {
if fs.conn == nil {
return false
}
return true
}
// Disconnects from socket
func Disconnect() (err error) {
if fs.conn != nil {
err = fs.conn.Close()
}
return
}
// Auth to FS
func Auth() error {
authCmd := fmt.Sprintf("auth %s\n\n", fs.fspaswd)
fmt.Fprint(fs.conn, authCmd)
if rply, err := readHeaders(); err != nil || !strings.Contains(rply, "Reply-Text: +OK accepted") {
fmt.Println("Got reply to auth:", rply)
return errors.New("auth error")
}
return nil
}
// Subscribe to events
func EventsPlain(events []string) error {
if len(events) == 0 {
return nil
}
eventsCmd := "event plain"
for _, ev := range events {
if ev == "ALL" {
eventsCmd = "event plain all"
break
}
eventsCmd += " " + ev
}
eventsCmd += "\n\n"
fmt.Fprint(fs.conn, eventsCmd) // Send command here
if rply, err := readHeaders(); err != nil || !strings.Contains(rply, "Reply-Text: +OK") {
return errors.New("event error")
}
return nil
}
// Enable filters
func filterEvents(filters map[string]string) error {
if len(filters) == 0 { //Nothing to filter
return nil
}
cmd := "filter"
for hdr, val := range filters {
cmd += " " + hdr + " " + val
}
cmd += "\n\n"
fmt.Fprint(fs.conn, cmd)
if rply, err := readHeaders(); err != nil || !strings.Contains(rply, "Reply-Text: +OK") {
return errors.New("filter error")
}
return nil
}
// Connect or reconnect
func Connect(reconnects int) error {
if Connected() {
Disconnect()
}
var conErr error
for i := 0; i < reconnects; i++ {
fs.conn, conErr = net.Dial("tcp", fs.fsaddress)
if conErr == nil {
// Connected, init buffer, auth and subscribe to desired events and filters
fs.buffer = bufio.NewReaderSize(fs.conn, 8192) // reinit buffer
if authChg, err := readHeaders(); err != nil || !strings.Contains(authChg, "auth/request") {
return errors.New("No auth challenge received")
} else if errAuth := Auth(); errAuth != nil { // Auth did not succeed
return errAuth
}
// Subscribe to events handled by event handlers
handledEvs := make([]string, len(fs.eventHandlers))
j := 0
for k := range fs.eventHandlers {
handledEvs[j] = k
j++
}
if subscribeErr := EventsPlain(handledEvs); subscribeErr != nil {
return subscribeErr
}
if filterErr := filterEvents(fs.eventFilters); filterErr != nil {
return filterErr
}
return nil
}
time.Sleep(time.Duration(fs.delayFunc()) * time.Second)
fs.logger.Warning("FreeSWITCH reconnect!")
}
return conErr
}
// Send API command
func SendApiCmd(cmdStr string) error {
if !Connected() {
return errors.New("Not connected to FS")
}
cmd := fmt.Sprintf("api %s\n\n", cmdStr)
fmt.Fprint(fs.conn, cmd)
resEvent := <-fs.apiChan
if strings.Contains(resEvent, "-ERR") {
return errors.New("Command failed")
}
return nil
}
// SendMessage command
func SendMsgCmd(uuid string, cmdargs map[string]string) error {
if len(cmdargs) == 0 {
return errors.New("Need command arguments")
}
if !Connected() {
return errors.New("Not connected to FS")
}
argStr := ""
for k, v := range cmdargs {
argStr += fmt.Sprintf("%s:%s\n", k, v)
}
fmt.Fprint(fs.conn, fmt.Sprintf("sendmsg %s\n%s\n", uuid, argStr))
replyTxt := <-fs.cmdChan
if strings.HasPrefix(replyTxt, "-ERR") {
return fmt.Errorf("SendMessage: %s", replyTxt)
}
return nil
}
// Reads events from socket
func ReadEvents() {
// Read events from buffer, firing them up further
for {
hdr, body, err := readEvent()
if err != nil {
fs.logger.Warning("FreeSWITCH connection broken: attemting reconnect")
connErr := Connect(fs.reconnects)
if connErr != nil {
return
}
continue // Connection reset
}
if strings.Contains(hdr, "api/response") {
fs.apiChan <- hdr + body
} else if strings.Contains(hdr, "command/reply") {
fs.cmdChan <- headerVal(hdr, "Reply-Text")
}
if body != "" { // We got a body, could be event, try dispatching it
dispatchEvent(body)
}
}
return
}
// Dispatch events to handlers in async mode
func dispatchEvent(event string) {
eventName := headerVal(event, "Event-Name")
if handlerFunc, hasHandler := fs.eventHandlers[eventName]; hasHandler {
go handlerFunc(event)
}
}
// successive Fibonacci numbers.
func fib() func() int {
a, b := 0, 1
return func() int {
a, b = b, a+b
return a
}
}

View File

@@ -1,150 +0,0 @@
package fsock
import (
"bufio"
"io/ioutil"
"os"
"testing"
"time"
)
const (
HEADER = `Content-Length: 564
Content-Type: text/event-plain
`
BODY = `Event-Name: RE_SCHEDULE
Core-UUID: 792e181c-b6e6-499c-82a1-52a778e7d82d
FreeSWITCH-Hostname: h1.ip-switch.net
FreeSWITCH-Switchname: h1.ip-switch.net
FreeSWITCH-IPv4: 88.198.12.156
FreeSWITCH-IPv6: %3A%3A1
Event-Date-Local: 2012-10-05%2013%3A41%3A38
Event-Date-GMT: Fri,%2005%20Oct%202012%2011%3A41%3A38%20GMT
Event-Date-Timestamp: 1349437298012866
Event-Calling-File: switch_scheduler.c
Event-Calling-Function: switch_scheduler_execute
Event-Calling-Line-Number: 65
Event-Sequence: 34263
Task-ID: 2
Task-Desc: heartbeat
Task-Group: core
Task-Runtime: 1349437318
extra data
`
)
func TestHeaders(t *testing.T) {
r, w, err := os.Pipe()
if err != nil {
t.Error("Error creating pype!")
}
fs = &fSock{}
fs.buffer = bufio.NewReader(r)
w.Write([]byte(HEADER))
h, err := readHeaders()
if err != nil || h != "Content-Length: 564\nContent-Type: text/event-plain\n" {
t.Error("Error parsing headers: ", h, err)
}
}
func TestEvent(t *testing.T) {
r, w, err := os.Pipe()
if err != nil {
t.Error("Error creating pype!")
}
fs = &fSock{}
fs.buffer = bufio.NewReader(r)
w.Write([]byte(HEADER + BODY))
h, b, err := readEvent()
if err != nil || h != HEADER[:len(HEADER)-1] || len(b) != 564 {
t.Error("Error parsing event: ", h, b, len(b))
}
}
func TestHeaderValMiddle(t *testing.T) {
h := headerVal(BODY, "Event-Date-GMT")
if h != "Fri,%2005%20Oct%202012%2011%3A41%3A38%20GMT" {
t.Error("Header val error: ", h)
}
}
func TestHeaderValStart(t *testing.T) {
h := headerVal(BODY, "Event-Name")
if h != "RE_SCHEDULE" {
t.Error("Header val error: ", h)
}
}
func TestHeaderValEnd(t *testing.T) {
h := headerVal(BODY, "Task-Runtime")
if h != "1349437318" {
t.Error("Header val error: ", h)
}
}
func TestEventToMapUnfiltered(t *testing.T) {
fields := FSEventStrToMap(BODY, nil)
if fields["Event-Name"] != "RE_SCHEDULE" {
t.Error("Event not parsed correctly: ", fields)
}
if len(fields) != 17 {
t.Error("Incorrect number of event fields: ", len(fields))
}
}
func TestEventToMapFiltered(t *testing.T) {
fields := FSEventStrToMap(BODY, []string{"Event-Name", "Task-Group", "Event-Date-GMT"})
if fields["Event-Date-Local"] != "2012-10-05 13:41:38" {
t.Error("Event not parsed correctly: ", fields)
}
if len(fields) != 14 {
t.Error("Incorrect number of event fields: ", len(fields))
}
}
func TestReadEvents(t *testing.T) {
data, err := ioutil.ReadFile("test_data.txt")
if err != nil {
t.Error("Error reading test data file!")
}
r, w, err := os.Pipe()
if err != nil {
t.Error("Error creating pype!")
}
fs = &fSock{}
fs.buffer = bufio.NewReader(r)
var events int32
fs.eventHandlers = map[string]func(string){
"HEARTBEAT": func(string) { events++ },
"RE_SCHEDULE": func(string) { events++ },
"CHANNEL_STATE": func(string) { events++ },
"CODEC": func(string) { events++ },
"CHANNEL_CREATE": func(string) { events++ },
"CHANNEL_CALLSTATE": func(string) { events++ },
"API": func(string) { events++ },
"CHANNEL_EXECUTE": func(string) { events++ },
"CHANNEL_EXECUTE_COMPLETE": func(string) { events++ },
"CHANNEL_PARK": func(string) { events++ },
"CHANNEL_HANGUP": func(string) { events++ },
"CHANNEL_HANGUP_COMPLETE": func(string) { events++ },
"CHANNEL_UNPARK": func(string) { events++ },
"CHANNEL_DESTROY": func(string) { events++ },
}
go ReadEvents()
w.Write(data)
time.Sleep(50 * time.Millisecond)
if events != 45 {
t.Error("Error reading events: ", events)
}
}
/*********************** Benchmarks ************************/
func BenchmarkHeaderVal(b *testing.B) {
for i := 0; i < b.N; i++ {
headerVal(HEADER, "Content-Length")
headerVal(BODY, "Event-Date-Loca")
}
}

File diff suppressed because it is too large Load Diff