From 06c444d76671099b72292cd033b9dddb3a90d284 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Sat, 6 Oct 2012 14:13:17 +0300 Subject: [PATCH] more tests on fsock --- fsock/fsock.go | 71 ++++++++++------- fsock/fsock_test.go | 27 +++++++ sessionmanager/fsevent.go | 46 +++++------- sessionmanager/fsevent_test.go | 4 +- sessionmanager/fssessionmanager.go | 117 +++++++++-------------------- 5 files changed, 124 insertions(+), 141 deletions(-) diff --git a/fsock/fsock.go b/fsock/fsock.go index ebb41196f..2c3553d73 100644 --- a/fsock/fsock.go +++ b/fsock/fsock.go @@ -14,7 +14,34 @@ import ( "time" ) -var fs *fSock // Used to share FS connection via package globals +var fs *fSock // Used to share FS connection via package globals (singleton) + +// Connection to FreeSWITCH Socket +type fSock struct { + conn net.Conn + buffer *bufio.Reader + fsaddress, + fspaswd string + eventHandlers map[string]func(string) + eventFilters map[string]string + apiChan chan string + reconnects int + delayFunc func() int +} + +// Connects to FS and starts buffering input +func New(fsaddr, fspaswd string, reconnects int, eventHandlers map[string]func(string)) error { + eventFilters := make(map[string]string) + fs = &fSock{fsaddress: fsaddr, fspaswd: fspaswd, eventHandlers: eventHandlers, eventFilters: eventFilters} + fs.apiChan = make(chan string) // Init apichan so we can use it to pass api replies + fs.reconnects = reconnects + fs.delayFunc = fib() + errConn := Connect(reconnects) + if errConn != nil { + return errConn + } + return nil +} // Extracts value of a header from anywhere in content string func headerVal(hdrs, hdr string) string { @@ -49,7 +76,7 @@ func isSliceMember(ss []string, s string) bool { } // Convert fseventStr into fseventMap -func fsEventStrToMap(fsevstr string, headers []string) map[string]string { +func FSEventStrToMap(fsevstr string, headers []string) map[string]string { fsevent := make(map[string]string) filtered := false if len(headers) != 0 { @@ -66,29 +93,6 @@ func fsEventStrToMap(fsevstr string, headers []string) map[string]string { return fsevent } -// Connects to FS and starts buffering input -func NewFSock(fsaddr, fspaswd string, reconnects int, eventHandlers map[string]func(string)) error { - eventFilters := make(map[string]string) - fs = &fSock{fsaddress: fsaddr, fspaswd: fspaswd, eventHandlers: eventHandlers, eventFilters: eventFilters} - fs.apiChan = make(chan string) // Init apichan so we can use it to pass api replies - errConn := Connect(reconnects) - if errConn != nil { - return errConn - } - return nil -} - -// Connection to FreeSWITCH Socket -type fSock struct { - conn net.Conn - buffer *bufio.Reader - fsaddress, - fspaswd string - eventHandlers map[string]func(string) - eventFilters map[string]string - apiChan chan string -} - // Reads headers until delimiter reached func readHeaders() (s string, err error) { bytesRead := make([]byte, 0) @@ -237,7 +241,7 @@ func Connect(reconnects int) error { } return nil } - time.Sleep(time.Duration(i) * time.Second) + time.Sleep(time.Duration(fs.delayFunc()) * time.Second) } return conErr } @@ -263,7 +267,7 @@ func ReadEvents() { hdr, body, err := readEvent() if err != nil { fmt.Println("WARNING: got error while reading events: ", err.Error()) - connErr := Connect(3) + connErr := Connect(fs.reconnects) if connErr != nil { fmt.Println("FSock reader - cannot connect to FS") return @@ -274,7 +278,7 @@ func ReadEvents() { fs.apiChan <- hdr + body } if body != "" { // We got a body, could be event, try dispatching it - DispatchEvent(body) + dispatchEvent(body) } } fmt.Println("Exiting ReadEvents") @@ -282,9 +286,18 @@ func ReadEvents() { } // Dispatch events to handlers in async mode -func DispatchEvent(event string) { +func dispatchEvent(event string) { eventName := headerVal(event, "Event-Name") if handlerFunc, hasHandler := fs.eventHandlers[eventName]; hasHandler { go handlerFunc(event) } } + +// successive Fibonacci numbers. +func fib() func() int { + a, b := 0, 1 + return func() int { + a, b = b, a+b + return a + } +} diff --git a/fsock/fsock_test.go b/fsock/fsock_test.go index c9fedca0c..c66ba5b26 100644 --- a/fsock/fsock_test.go +++ b/fsock/fsock_test.go @@ -61,6 +61,33 @@ func TestEvent(t *testing.T) { } } +func TestHeaderVal(t *testing.T) { + h := headerVal(BODY, "Event-Date-GMT") + if h != "Fri,%2005%20Oct%202012%2011%3A41%3A38%20GMT" { + t.Error("Header val error: ", h) + } +} + +func TestEventToMapUnfiltered(t *testing.T) { + fields := FSEventStrToMap(BODY, nil) + if fields["Event-Name"] != "RE_SCHEDULE" { + t.Error("Event not parsed correctly: ", fields) + } + if len(fields) != 17 { + t.Error("Incorrect number of event fields: ", len(fields)) + } +} + +func TestEventToMapFiltered(t *testing.T) { + fields := FSEventStrToMap(BODY, []string{"Event-Name", "Task-Group", "Event-Date-GMT"}) + if fields["Event-Date-Local"] != "2012-10-05 13:41:38" { + t.Error("Event not parsed correctly: ", fields) + } + if len(fields) != 14 { + t.Error("Incorrect number of event fields: ", len(fields)) + } +} + func BenchmarkHeaderVal(b *testing.B) { for i := 0; i < b.N; i++ { headerVal(HEADER, "Content-Length") diff --git a/sessionmanager/fsevent.go b/sessionmanager/fsevent.go index 9514e8a98..f8c52e83c 100644 --- a/sessionmanager/fsevent.go +++ b/sessionmanager/fsevent.go @@ -20,8 +20,7 @@ package sessionmanager import ( "fmt" - "github.com/cgrates/cgrates/rater" - "regexp" + "github.com/cgrates/cgrates/fsock" "strconv" "strings" "time" @@ -29,13 +28,9 @@ import ( // Event type holding a mapping of all event's proprieties type FSEvent struct { - Fields map[string]string + fields map[string]string } -var ( - eventBodyRE = regexp.MustCompile(`"(.*?)":\s+"(.*?)"`) // for parsing the proprieties -) - const ( // Freswitch event proprities names DIRECTION = "Call-Direction" @@ -67,7 +62,7 @@ const ( // Nice printing for the event object. func (fsev *FSEvent) String() (result string) { - for k, v := range fsev.Fields { + for k, v := range fsev.fields { result += fmt.Sprintf("%s = %s\n", k, v) } result += "==============================================================" @@ -77,51 +72,44 @@ func (fsev *FSEvent) String() (result string) { // Loads the new event data from a body of text containing the key value proprieties. // It stores the parsed proprieties in the internal map. func (fsev *FSEvent) New(body string) Event { - fsev.Fields = make(map[string]string) - for _, fields := range eventBodyRE.FindAllStringSubmatch(body, -1) { - if len(fields) == 3 { - fsev.Fields[fields[1]] = fields[2] - } else { - rater.Logger.Err(fmt.Sprintf("malformed event field: %v", fields)) - } - } + fsev.fields = fsock.FSEventStrToMap(body, nil) return fsev } func (fsev *FSEvent) GetName() string { - return fsev.Fields[NAME] + return fsev.fields[NAME] } func (fsev *FSEvent) GetDirection() string { //TODO: temporary hack return "OUT" - //return fsev.Fields[DIRECTION] + //return fsev.fields[DIRECTION] } func (fsev *FSEvent) GetOrigId() string { - return fsev.Fields[ORIG_ID] + return fsev.fields[ORIG_ID] } func (fsev *FSEvent) GetSubject() string { - return fsev.Fields[SUBJECT] + return fsev.fields[SUBJECT] } func (fsev *FSEvent) GetAccount() string { - return fsev.Fields[ACCOUNT] + return fsev.fields[ACCOUNT] } func (fsev *FSEvent) GetDestination() string { - return fsev.Fields[DESTINATION] + return fsev.fields[DESTINATION] } func (fsev *FSEvent) GetTOR() string { - return fsev.Fields[TOR] + return fsev.fields[TOR] } func (fsev *FSEvent) GetUUID() string { - return fsev.Fields[UUID] + return fsev.fields[UUID] } func (fsev *FSEvent) GetTenant() string { - return fsev.Fields[CSTMID] + return fsev.fields[CSTMID] } func (fsev *FSEvent) GetCallDestNb() string { - return fsev.Fields[CALL_DEST_NB] + return fsev.fields[CALL_DEST_NB] } func (fsev *FSEvent) GetReqType() string { - return fsev.Fields[REQTYPE] + return fsev.fields[REQTYPE] } func (fsev *FSEvent) MissingParameter() bool { return strings.TrimSpace(fsev.GetDirection()) == "" || @@ -135,13 +123,13 @@ func (fsev *FSEvent) MissingParameter() bool { strings.TrimSpace(fsev.GetCallDestNb()) == "" } func (fsev *FSEvent) GetStartTime(field string) (t time.Time, err error) { - st, err := strconv.ParseInt(fsev.Fields[field], 0, 64) + st, err := strconv.ParseInt(fsev.fields[field], 0, 64) t = time.Unix(0, st*1000) return } func (fsev *FSEvent) GetEndTime() (t time.Time, err error) { - st, err := strconv.ParseInt(fsev.Fields[END_TIME], 0, 64) + st, err := strconv.ParseInt(fsev.fields[END_TIME], 0, 64) t = time.Unix(0, st*1000) return } diff --git a/sessionmanager/fsevent_test.go b/sessionmanager/fsevent_test.go index c947b6a4e..ace79619b 100644 --- a/sessionmanager/fsevent_test.go +++ b/sessionmanager/fsevent_test.go @@ -45,9 +45,9 @@ func TestEventCreation(t *testing.T) { "Idle-CPU": "100.000000"` ev := new(FSEvent).New(body) if ev.GetName() != "HEARTBEAT" { - t.Error("Event not parsed correctly!") + t.Error("Event not parsed correctly: ", ev) } - if len(ev.(*FSEvent).Fields) != 20 { + if len(ev.(*FSEvent).fields) != 20 { t.Error("Incorrect number of event fields!") } } diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 7fce1b14e..d42031251 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -37,86 +37,47 @@ type FSSessionManager struct { connector rater.Connector debitPeriod time.Duration loggerDB rater.DataStorage - delayFunc func() int } func NewFSSessionManager(storage rater.DataStorage, connector rater.Connector, debitPeriod time.Duration) *FSSessionManager { - return &FSSessionManager{loggerDB: storage} + return &FSSessionManager{loggerDB: storage, connector: connector, debitPeriod: debitPeriod} } // Connects to the freeswitch mod_event_socket server and starts // listening for events in json format. -func (sm *FSSessionManager) Connect(address, pass string) error { - sm.address = address - sm.pass = pass - if sm.conn != nil { - // in case it is a reconnect - sm.conn.Close() - } - conn, err := net.Dial("tcp", address) - if err != nil { - rater.Logger.Warning("Could not connect to freeswitch server!") - return err - } - sm.conn = conn - sm.buf = bufio.NewReaderSize(conn, 8192) - fmt.Fprint(conn, fmt.Sprintf("auth %s\n\n", pass)) - fmt.Fprint(conn, "event json HEARTBEAT CHANNEL_ANSWER CHANNEL_HANGUP_COMPLETE CHANNEL_PARK\n\n") - fmt.Fprint(conn, "filter Call-Direction inbound\n\n") - - handlers := make(map[string]func(string)) - handlers["HEARTBEAT"] = func(s string) { fmt.Println(s) } // Example handler - if officer.FS, err = officer.NewFSock(config.FS_SOCK, config.FS_PASWD, 3, handlers); err != nil { - fmt.Println("FreeSWITCH error:", err) - exitChan <- true +func (sm *FSSessionManager) Connect(address, pass string) (err error) { + if err = fsock.New(address, pass, 3, sm.createHandlers()); err != nil { + rater.Logger.Crit(fmt.Sprintf("FreeSWITCH error:", err)) return - } else if officer.FS.Connected() { - fmt.Println("Successfully connected to FreeSWITCH") + } else if fsock.Connected() { + rater.Logger.Info("Successfully connected to FreeSWITCH") } - officer.FS.ReadEvents() - exitChan <- true // If we have reached here something went wrong - - go func() { - sm.delayFunc = fib() - exitChan := make(chan bool) - for { - select { - case <-exitChan: - break - default: - sm.readNextEvent(exitChan) - } - } - }() + fsock.ReadEvents() return nil } -// Reads from freeswitch server buffer until it encounters a '}', -// than it creates an event object and calls the appropriate method -func (sm *FSSessionManager) readNextEvent(exitChan chan bool) (ev Event) { - body, err := sm.buf.ReadString('}') - if err != nil { - rater.Logger.Warning("Could not read from freeswitch connection!") - // wait until a sec - time.Sleep(time.Duration(sm.delayFunc()) * time.Second) - // try to reconnect - err = sm.Connect(sm.address, sm.pass) - if err == nil { - rater.Logger.Info("Successfuly reconnected to freeswitch! ") - exitChan <- true - } - } - ev = new(FSEvent).New(body) - switch ev.GetName() { - case HEARTBEAT: +func (sm *FSSessionManager) createHandlers() (handlers map[string]func(string)) { + handlers = make(map[string]func(string)) + hb := func(body string) { + ev := new(FSEvent).New(body) sm.OnHeartBeat(ev) - case PARK: + } + cp := func(body string) { + ev := new(FSEvent).New(body) sm.OnChannelPark(ev) - case ANSWER: + } + ca := func(body string) { + ev := new(FSEvent).New(body) sm.OnChannelAnswer(ev) - case HANGUP: + } + ch := func(body string) { + ev := new(FSEvent).New(body) sm.OnChannelHangupComplete(ev) } + handlers["HEARTBEAT"] = hb + handlers["CHANNEL_PARK"] = cp + handlers["CHANNEL_ANSWER"] = ca + handlers["CHANNEL_HANGUP_COMPLETE "] = ch return } @@ -132,15 +93,14 @@ func (sm *FSSessionManager) GetSession(uuid string) *Session { // Disconnects a session by sending hangup command to freeswitch func (sm *FSSessionManager) DisconnectSession(s *Session, notify string) { - fmt.Fprint(sm.conn, fmt.Sprintf("api uuid_setvar %s cgr_notify %s\n\n", s.uuid, notify)) - fmt.Fprint(sm.conn, fmt.Sprintf("SendMsg %s\ncall-command: hangup\nhangup-cause: MANAGER_REQUEST\n\n", s.uuid)) + fsock.Disconnect() s.Close() } // Sends the transfer command to unpark the call to freeswitch -func (sm *FSSessionManager) UnparkCall(uuid, call_dest_nb, notify string) { - fmt.Fprint(sm.conn, fmt.Sprintf("api uuid_setvar %s cgr_notify %s\n\n", uuid, notify)) - fmt.Fprint(sm.conn, fmt.Sprintf("api uuid_transfer %s %s\n\n", uuid, call_dest_nb)) +func (sm *FSSessionManager) unparkCall(uuid, call_dest_nb, notify string) { + fsock.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify)) + fsock.SendApiCmd(fmt.Sprintf("uuid_transfer %s %s\n\n", uuid, call_dest_nb)) } func (sm *FSSessionManager) OnHeartBeat(ev Event) { @@ -159,7 +119,7 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) { return } if ev.MissingParameter() { - sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), MISSING_PARAMETER) + sm.unparkCall(ev.GetUUID(), ev.GetCallDestNb(), MISSING_PARAMETER) rater.Logger.Err(fmt.Sprintf("Missing parameter for %s", ev.GetUUID())) return } @@ -176,20 +136,24 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) { err = sm.connector.GetMaxSessionTime(cd, &remainingSeconds) if err != nil { rater.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err)) - sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), SYSTEM_ERROR) + sm.unparkCall(ev.GetUUID(), ev.GetCallDestNb(), SYSTEM_ERROR) return } rater.Logger.Info(fmt.Sprintf("Remaining seconds: %v", remainingSeconds)) if remainingSeconds == 0 { rater.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey())) - sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), INSUFFICIENT_FUNDS) + sm.unparkCall(ev.GetUUID(), ev.GetCallDestNb(), INSUFFICIENT_FUNDS) return } - sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), AUTH_OK) + sm.unparkCall(ev.GetUUID(), ev.GetCallDestNb(), AUTH_OK) } func (sm *FSSessionManager) OnChannelAnswer(ev Event) { rater.Logger.Info("freeswitch answer") + s := NewSession(ev, sm) + if s != nil { + sm.sessions = append(sm.sessions, s) + } } func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { @@ -327,12 +291,3 @@ func (sm *FSSessionManager) GetDebitPeriod() time.Duration { func (sm *FSSessionManager) GetDbLogger() rater.DataStorage { return sm.loggerDB } - -// successive Fibonacci numbers. -func fib() func() int { - a, b := 0, 1 - return func() int { - a, b = b, a+b - return a - } -}