more tests on fsock

This commit is contained in:
Radu Ioan Fericean
2012-10-06 14:13:17 +03:00
parent 0c23c3b7c9
commit 06c444d766
5 changed files with 124 additions and 141 deletions

View File

@@ -14,7 +14,34 @@ import (
"time"
)
var fs *fSock // Used to share FS connection via package globals
var fs *fSock // Used to share FS connection via package globals (singleton)
// Connection to FreeSWITCH Socket
type fSock struct {
conn net.Conn
buffer *bufio.Reader
fsaddress,
fspaswd string
eventHandlers map[string]func(string)
eventFilters map[string]string
apiChan chan string
reconnects int
delayFunc func() int
}
// Connects to FS and starts buffering input
func New(fsaddr, fspaswd string, reconnects int, eventHandlers map[string]func(string)) error {
eventFilters := make(map[string]string)
fs = &fSock{fsaddress: fsaddr, fspaswd: fspaswd, eventHandlers: eventHandlers, eventFilters: eventFilters}
fs.apiChan = make(chan string) // Init apichan so we can use it to pass api replies
fs.reconnects = reconnects
fs.delayFunc = fib()
errConn := Connect(reconnects)
if errConn != nil {
return errConn
}
return nil
}
// Extracts value of a header from anywhere in content string
func headerVal(hdrs, hdr string) string {
@@ -49,7 +76,7 @@ func isSliceMember(ss []string, s string) bool {
}
// Convert fseventStr into fseventMap
func fsEventStrToMap(fsevstr string, headers []string) map[string]string {
func FSEventStrToMap(fsevstr string, headers []string) map[string]string {
fsevent := make(map[string]string)
filtered := false
if len(headers) != 0 {
@@ -66,29 +93,6 @@ func fsEventStrToMap(fsevstr string, headers []string) map[string]string {
return fsevent
}
// Connects to FS and starts buffering input
func NewFSock(fsaddr, fspaswd string, reconnects int, eventHandlers map[string]func(string)) error {
eventFilters := make(map[string]string)
fs = &fSock{fsaddress: fsaddr, fspaswd: fspaswd, eventHandlers: eventHandlers, eventFilters: eventFilters}
fs.apiChan = make(chan string) // Init apichan so we can use it to pass api replies
errConn := Connect(reconnects)
if errConn != nil {
return errConn
}
return nil
}
// Connection to FreeSWITCH Socket
type fSock struct {
conn net.Conn
buffer *bufio.Reader
fsaddress,
fspaswd string
eventHandlers map[string]func(string)
eventFilters map[string]string
apiChan chan string
}
// Reads headers until delimiter reached
func readHeaders() (s string, err error) {
bytesRead := make([]byte, 0)
@@ -237,7 +241,7 @@ func Connect(reconnects int) error {
}
return nil
}
time.Sleep(time.Duration(i) * time.Second)
time.Sleep(time.Duration(fs.delayFunc()) * time.Second)
}
return conErr
}
@@ -263,7 +267,7 @@ func ReadEvents() {
hdr, body, err := readEvent()
if err != nil {
fmt.Println("WARNING: got error while reading events: ", err.Error())
connErr := Connect(3)
connErr := Connect(fs.reconnects)
if connErr != nil {
fmt.Println("FSock reader - cannot connect to FS")
return
@@ -274,7 +278,7 @@ func ReadEvents() {
fs.apiChan <- hdr + body
}
if body != "" { // We got a body, could be event, try dispatching it
DispatchEvent(body)
dispatchEvent(body)
}
}
fmt.Println("Exiting ReadEvents")
@@ -282,9 +286,18 @@ func ReadEvents() {
}
// Dispatch events to handlers in async mode
func DispatchEvent(event string) {
func dispatchEvent(event string) {
eventName := headerVal(event, "Event-Name")
if handlerFunc, hasHandler := fs.eventHandlers[eventName]; hasHandler {
go handlerFunc(event)
}
}
// successive Fibonacci numbers.
func fib() func() int {
a, b := 0, 1
return func() int {
a, b = b, a+b
return a
}
}

View File

@@ -61,6 +61,33 @@ func TestEvent(t *testing.T) {
}
}
func TestHeaderVal(t *testing.T) {
h := headerVal(BODY, "Event-Date-GMT")
if h != "Fri,%2005%20Oct%202012%2011%3A41%3A38%20GMT" {
t.Error("Header val error: ", h)
}
}
func TestEventToMapUnfiltered(t *testing.T) {
fields := FSEventStrToMap(BODY, nil)
if fields["Event-Name"] != "RE_SCHEDULE" {
t.Error("Event not parsed correctly: ", fields)
}
if len(fields) != 17 {
t.Error("Incorrect number of event fields: ", len(fields))
}
}
func TestEventToMapFiltered(t *testing.T) {
fields := FSEventStrToMap(BODY, []string{"Event-Name", "Task-Group", "Event-Date-GMT"})
if fields["Event-Date-Local"] != "2012-10-05 13:41:38" {
t.Error("Event not parsed correctly: ", fields)
}
if len(fields) != 14 {
t.Error("Incorrect number of event fields: ", len(fields))
}
}
func BenchmarkHeaderVal(b *testing.B) {
for i := 0; i < b.N; i++ {
headerVal(HEADER, "Content-Length")

View File

@@ -20,8 +20,7 @@ package sessionmanager
import (
"fmt"
"github.com/cgrates/cgrates/rater"
"regexp"
"github.com/cgrates/cgrates/fsock"
"strconv"
"strings"
"time"
@@ -29,13 +28,9 @@ import (
// Event type holding a mapping of all event's proprieties
type FSEvent struct {
Fields map[string]string
fields map[string]string
}
var (
eventBodyRE = regexp.MustCompile(`"(.*?)":\s+"(.*?)"`) // for parsing the proprieties
)
const (
// Freswitch event proprities names
DIRECTION = "Call-Direction"
@@ -67,7 +62,7 @@ const (
// Nice printing for the event object.
func (fsev *FSEvent) String() (result string) {
for k, v := range fsev.Fields {
for k, v := range fsev.fields {
result += fmt.Sprintf("%s = %s\n", k, v)
}
result += "=============================================================="
@@ -77,51 +72,44 @@ func (fsev *FSEvent) String() (result string) {
// Loads the new event data from a body of text containing the key value proprieties.
// It stores the parsed proprieties in the internal map.
func (fsev *FSEvent) New(body string) Event {
fsev.Fields = make(map[string]string)
for _, fields := range eventBodyRE.FindAllStringSubmatch(body, -1) {
if len(fields) == 3 {
fsev.Fields[fields[1]] = fields[2]
} else {
rater.Logger.Err(fmt.Sprintf("malformed event field: %v", fields))
}
}
fsev.fields = fsock.FSEventStrToMap(body, nil)
return fsev
}
func (fsev *FSEvent) GetName() string {
return fsev.Fields[NAME]
return fsev.fields[NAME]
}
func (fsev *FSEvent) GetDirection() string {
//TODO: temporary hack
return "OUT"
//return fsev.Fields[DIRECTION]
//return fsev.fields[DIRECTION]
}
func (fsev *FSEvent) GetOrigId() string {
return fsev.Fields[ORIG_ID]
return fsev.fields[ORIG_ID]
}
func (fsev *FSEvent) GetSubject() string {
return fsev.Fields[SUBJECT]
return fsev.fields[SUBJECT]
}
func (fsev *FSEvent) GetAccount() string {
return fsev.Fields[ACCOUNT]
return fsev.fields[ACCOUNT]
}
func (fsev *FSEvent) GetDestination() string {
return fsev.Fields[DESTINATION]
return fsev.fields[DESTINATION]
}
func (fsev *FSEvent) GetTOR() string {
return fsev.Fields[TOR]
return fsev.fields[TOR]
}
func (fsev *FSEvent) GetUUID() string {
return fsev.Fields[UUID]
return fsev.fields[UUID]
}
func (fsev *FSEvent) GetTenant() string {
return fsev.Fields[CSTMID]
return fsev.fields[CSTMID]
}
func (fsev *FSEvent) GetCallDestNb() string {
return fsev.Fields[CALL_DEST_NB]
return fsev.fields[CALL_DEST_NB]
}
func (fsev *FSEvent) GetReqType() string {
return fsev.Fields[REQTYPE]
return fsev.fields[REQTYPE]
}
func (fsev *FSEvent) MissingParameter() bool {
return strings.TrimSpace(fsev.GetDirection()) == "" ||
@@ -135,13 +123,13 @@ func (fsev *FSEvent) MissingParameter() bool {
strings.TrimSpace(fsev.GetCallDestNb()) == ""
}
func (fsev *FSEvent) GetStartTime(field string) (t time.Time, err error) {
st, err := strconv.ParseInt(fsev.Fields[field], 0, 64)
st, err := strconv.ParseInt(fsev.fields[field], 0, 64)
t = time.Unix(0, st*1000)
return
}
func (fsev *FSEvent) GetEndTime() (t time.Time, err error) {
st, err := strconv.ParseInt(fsev.Fields[END_TIME], 0, 64)
st, err := strconv.ParseInt(fsev.fields[END_TIME], 0, 64)
t = time.Unix(0, st*1000)
return
}

View File

@@ -45,9 +45,9 @@ func TestEventCreation(t *testing.T) {
"Idle-CPU": "100.000000"`
ev := new(FSEvent).New(body)
if ev.GetName() != "HEARTBEAT" {
t.Error("Event not parsed correctly!")
t.Error("Event not parsed correctly: ", ev)
}
if len(ev.(*FSEvent).Fields) != 20 {
if len(ev.(*FSEvent).fields) != 20 {
t.Error("Incorrect number of event fields!")
}
}

View File

@@ -37,86 +37,47 @@ type FSSessionManager struct {
connector rater.Connector
debitPeriod time.Duration
loggerDB rater.DataStorage
delayFunc func() int
}
func NewFSSessionManager(storage rater.DataStorage, connector rater.Connector, debitPeriod time.Duration) *FSSessionManager {
return &FSSessionManager{loggerDB: storage}
return &FSSessionManager{loggerDB: storage, connector: connector, debitPeriod: debitPeriod}
}
// Connects to the freeswitch mod_event_socket server and starts
// listening for events in json format.
func (sm *FSSessionManager) Connect(address, pass string) error {
sm.address = address
sm.pass = pass
if sm.conn != nil {
// in case it is a reconnect
sm.conn.Close()
}
conn, err := net.Dial("tcp", address)
if err != nil {
rater.Logger.Warning("Could not connect to freeswitch server!")
return err
}
sm.conn = conn
sm.buf = bufio.NewReaderSize(conn, 8192)
fmt.Fprint(conn, fmt.Sprintf("auth %s\n\n", pass))
fmt.Fprint(conn, "event json HEARTBEAT CHANNEL_ANSWER CHANNEL_HANGUP_COMPLETE CHANNEL_PARK\n\n")
fmt.Fprint(conn, "filter Call-Direction inbound\n\n")
handlers := make(map[string]func(string))
handlers["HEARTBEAT"] = func(s string) { fmt.Println(s) } // Example handler
if officer.FS, err = officer.NewFSock(config.FS_SOCK, config.FS_PASWD, 3, handlers); err != nil {
fmt.Println("FreeSWITCH error:", err)
exitChan <- true
func (sm *FSSessionManager) Connect(address, pass string) (err error) {
if err = fsock.New(address, pass, 3, sm.createHandlers()); err != nil {
rater.Logger.Crit(fmt.Sprintf("FreeSWITCH error:", err))
return
} else if officer.FS.Connected() {
fmt.Println("Successfully connected to FreeSWITCH")
} else if fsock.Connected() {
rater.Logger.Info("Successfully connected to FreeSWITCH")
}
officer.FS.ReadEvents()
exitChan <- true // If we have reached here something went wrong
go func() {
sm.delayFunc = fib()
exitChan := make(chan bool)
for {
select {
case <-exitChan:
break
default:
sm.readNextEvent(exitChan)
}
}
}()
fsock.ReadEvents()
return nil
}
// Reads from freeswitch server buffer until it encounters a '}',
// than it creates an event object and calls the appropriate method
func (sm *FSSessionManager) readNextEvent(exitChan chan bool) (ev Event) {
body, err := sm.buf.ReadString('}')
if err != nil {
rater.Logger.Warning("Could not read from freeswitch connection!")
// wait until a sec
time.Sleep(time.Duration(sm.delayFunc()) * time.Second)
// try to reconnect
err = sm.Connect(sm.address, sm.pass)
if err == nil {
rater.Logger.Info("Successfuly reconnected to freeswitch! ")
exitChan <- true
}
}
ev = new(FSEvent).New(body)
switch ev.GetName() {
case HEARTBEAT:
func (sm *FSSessionManager) createHandlers() (handlers map[string]func(string)) {
handlers = make(map[string]func(string))
hb := func(body string) {
ev := new(FSEvent).New(body)
sm.OnHeartBeat(ev)
case PARK:
}
cp := func(body string) {
ev := new(FSEvent).New(body)
sm.OnChannelPark(ev)
case ANSWER:
}
ca := func(body string) {
ev := new(FSEvent).New(body)
sm.OnChannelAnswer(ev)
case HANGUP:
}
ch := func(body string) {
ev := new(FSEvent).New(body)
sm.OnChannelHangupComplete(ev)
}
handlers["HEARTBEAT"] = hb
handlers["CHANNEL_PARK"] = cp
handlers["CHANNEL_ANSWER"] = ca
handlers["CHANNEL_HANGUP_COMPLETE "] = ch
return
}
@@ -132,15 +93,14 @@ func (sm *FSSessionManager) GetSession(uuid string) *Session {
// Disconnects a session by sending hangup command to freeswitch
func (sm *FSSessionManager) DisconnectSession(s *Session, notify string) {
fmt.Fprint(sm.conn, fmt.Sprintf("api uuid_setvar %s cgr_notify %s\n\n", s.uuid, notify))
fmt.Fprint(sm.conn, fmt.Sprintf("SendMsg %s\ncall-command: hangup\nhangup-cause: MANAGER_REQUEST\n\n", s.uuid))
fsock.Disconnect()
s.Close()
}
// Sends the transfer command to unpark the call to freeswitch
func (sm *FSSessionManager) UnparkCall(uuid, call_dest_nb, notify string) {
fmt.Fprint(sm.conn, fmt.Sprintf("api uuid_setvar %s cgr_notify %s\n\n", uuid, notify))
fmt.Fprint(sm.conn, fmt.Sprintf("api uuid_transfer %s %s\n\n", uuid, call_dest_nb))
func (sm *FSSessionManager) unparkCall(uuid, call_dest_nb, notify string) {
fsock.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify))
fsock.SendApiCmd(fmt.Sprintf("uuid_transfer %s %s\n\n", uuid, call_dest_nb))
}
func (sm *FSSessionManager) OnHeartBeat(ev Event) {
@@ -159,7 +119,7 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) {
return
}
if ev.MissingParameter() {
sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), MISSING_PARAMETER)
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNb(), MISSING_PARAMETER)
rater.Logger.Err(fmt.Sprintf("Missing parameter for %s", ev.GetUUID()))
return
}
@@ -176,20 +136,24 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) {
err = sm.connector.GetMaxSessionTime(cd, &remainingSeconds)
if err != nil {
rater.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err))
sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), SYSTEM_ERROR)
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNb(), SYSTEM_ERROR)
return
}
rater.Logger.Info(fmt.Sprintf("Remaining seconds: %v", remainingSeconds))
if remainingSeconds == 0 {
rater.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey()))
sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), INSUFFICIENT_FUNDS)
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNb(), INSUFFICIENT_FUNDS)
return
}
sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), AUTH_OK)
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNb(), AUTH_OK)
}
func (sm *FSSessionManager) OnChannelAnswer(ev Event) {
rater.Logger.Info("freeswitch answer")
s := NewSession(ev, sm)
if s != nil {
sm.sessions = append(sm.sessions, s)
}
}
func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) {
@@ -327,12 +291,3 @@ func (sm *FSSessionManager) GetDebitPeriod() time.Duration {
func (sm *FSSessionManager) GetDbLogger() rater.DataStorage {
return sm.loggerDB
}
// successive Fibonacci numbers.
func fib() func() int {
a, b := 0, 1
return func() int {
a, b = b, a+b
return a
}
}