hidden fsock inside package

This commit is contained in:
Radu Ioan Fericean
2012-10-06 12:58:43 +03:00
parent 0126429b6c
commit 0c23c3b7c9
7 changed files with 263 additions and 346 deletions

View File

@@ -255,9 +255,9 @@ func startSessionManager(responder *rater.Responder, loggerDb rater.DataStorage)
}
switch sm_switch_type {
case FS:
sm := sessionmanager.NewFSSessionManager(loggerDb)
dp, _ := time.ParseDuration(fmt.Sprintf("%vs", sm_debit_period))
sm.Connect(&sessionmanager.SessionDelegate{Connector: connector, DebitPeriod: dp}, freeswitch_server, freeswitch_pass)
sm := sessionmanager.NewFSSessionManager(loggerDb, connectorm, dp)
sm.Connect(freeswitch_server, freeswitch_pass)
default:
rater.Logger.Err(fmt.Sprintf("Cannot start session manger of type: %s!", sm_switch_type))
exitChan <- true

View File

@@ -14,10 +14,10 @@ import (
"time"
)
var FS *FSock // Used to share FS connection via package globals
var fs *fSock // Used to share FS connection via package globals
// Extracts value of a header from anywhere in content string
func HeaderVal(hdrs, hdr string) string {
func headerVal(hdrs, hdr string) string {
var hdrSIdx, hdrEIdx int
if hdrSIdx = strings.Index(hdrs, hdr); hdrSIdx == -1 {
return ""
@@ -32,7 +32,7 @@ func HeaderVal(hdrs, hdr string) string {
}
// FS event header values are urlencoded. Use this to decode them. On error, use original value
func UrlDecode(hdrVal string) string {
func urlDecode(hdrVal string) string {
if valUnescaped, errUnescaping := url.QueryUnescape(hdrVal); errUnescaping == nil {
hdrVal = valUnescaped
}
@@ -40,7 +40,7 @@ func UrlDecode(hdrVal string) string {
}
// Binary string search in slice
func IsSliceMember(ss []string, s string) bool {
func isSliceMember(ss []string, s string) bool {
sort.Strings(ss)
if i := sort.SearchStrings(ss, s); i < len(ss) && ss[i] == s {
return true
@@ -49,7 +49,7 @@ func IsSliceMember(ss []string, s string) bool {
}
// Convert fseventStr into fseventMap
func FSEventStrToMap(fsevstr string, headers []string) map[string]string {
func fsEventStrToMap(fsevstr string, headers []string) map[string]string {
fsevent := make(map[string]string)
filtered := false
if len(headers) != 0 {
@@ -57,29 +57,29 @@ func FSEventStrToMap(fsevstr string, headers []string) map[string]string {
}
for _, strLn := range strings.Split(fsevstr, "\n") {
if hdrVal := strings.SplitN(strLn, ": ", 2); len(hdrVal) == 2 {
if filtered && IsSliceMember(headers, hdrVal[0]) {
if filtered && isSliceMember(headers, hdrVal[0]) {
continue // Loop again since we only work on filtered fields
}
fsevent[hdrVal[0]] = UrlDecode(strings.TrimSpace(strings.TrimRight(hdrVal[1], "\n")))
fsevent[hdrVal[0]] = urlDecode(strings.TrimSpace(strings.TrimRight(hdrVal[1], "\n")))
}
}
return fsevent
}
// Connects to FS and starts buffering input
func NewFSock(fsaddr, fspaswd string, reconnects int, eventHandlers map[string]func(string)) (*FSock, error) {
func NewFSock(fsaddr, fspaswd string, reconnects int, eventHandlers map[string]func(string)) error {
eventFilters := make(map[string]string)
fsock := FSock{fsaddress: fsaddr, fspaswd: fspaswd, eventHandlers: eventHandlers, eventFilters: eventFilters}
fsock.apiChan = make(chan string) // Init apichan so we can use it to pass api replies
errConn := fsock.Connect(reconnects)
fs = &fSock{fsaddress: fsaddr, fspaswd: fspaswd, eventHandlers: eventHandlers, eventFilters: eventFilters}
fs.apiChan = make(chan string) // Init apichan so we can use it to pass api replies
errConn := Connect(reconnects)
if errConn != nil {
return nil, errConn
return errConn
}
return &fsock, nil
return nil
}
// Connection to FreeSWITCH Socket
type FSock struct {
type fSock struct {
conn net.Conn
buffer *bufio.Reader
fsaddress,
@@ -90,11 +90,11 @@ type FSock struct {
}
// Reads headers until delimiter reached
func (self *FSock) readHeaders() (s string, err error) {
func readHeaders() (s string, err error) {
bytesRead := make([]byte, 0)
var readLine []byte
for {
readLine, err = self.buffer.ReadBytes('\n')
readLine, err = fs.buffer.ReadBytes('\n')
if err != nil {
return
}
@@ -108,9 +108,9 @@ func (self *FSock) readHeaders() (s string, err error) {
}
// Reads the body from buffer, ln is given by content-length of headers
func (self *FSock) readBody(ln int) (s string, err error) {
func readBody(ln int) (s string, err error) {
bytesRead := make([]byte, ln)
n, err := self.buffer.Read(bytesRead)
n, err := fs.buffer.Read(bytesRead)
if err != nil {
return
}
@@ -122,46 +122,46 @@ func (self *FSock) readBody(ln int) (s string, err error) {
}
// Event is made out of headers and body (if present)
func (self *FSock) readEvent() (string, string, error) {
func readEvent() (string, string, error) {
var hdrs, body string
var cl int
var err error
if hdrs, err = self.readHeaders(); err != nil {
if hdrs, err = readHeaders(); err != nil {
return "", "", err
}
if !strings.Contains(hdrs, "Content-Length") { //No body
return hdrs, "", nil
}
clStr := HeaderVal(hdrs, "Content-Length")
clStr := headerVal(hdrs, "Content-Length")
if cl, err = strconv.Atoi(clStr); err != nil {
return "", "", errors.New("Cannot extract content length")
}
if body, err = self.readBody(cl); err != nil {
if body, err = readBody(cl); err != nil {
return "", "", errors.New("Cannot extract body")
}
return hdrs, body, nil
}
// Checks if socket connected. Can be extended with pings
func (self *FSock) Connected() bool {
if self.conn == nil {
func Connected() bool {
if fs.conn == nil {
return false
}
return true
}
// Disconnects from socket
func (self *FSock) Disconnect() {
if self.conn != nil {
self.conn.Close()
func Disconnect() {
if fs.conn != nil {
fs.conn.Close()
}
}
// Auth to FS
func (self *FSock) Auth() error {
authCmd := fmt.Sprintf("auth %s\n\n", self.fspaswd)
fmt.Fprint(self.conn, authCmd)
if rply, err := self.readHeaders(); err != nil || !strings.Contains(rply, "Reply-Text: +OK accepted") {
func Auth() error {
authCmd := fmt.Sprintf("auth %s\n\n", fs.fspaswd)
fmt.Fprint(fs.conn, authCmd)
if rply, err := readHeaders(); err != nil || !strings.Contains(rply, "Reply-Text: +OK accepted") {
fmt.Println("Got reply to auth:", rply)
return errors.New("auth error")
}
@@ -169,7 +169,7 @@ func (self *FSock) Auth() error {
}
// Subscribe to events
func (self *FSock) EventsPlain(events []string) error {
func EventsPlain(events []string) error {
if len(events) == 0 {
return nil
}
@@ -182,15 +182,15 @@ func (self *FSock) EventsPlain(events []string) error {
eventsCmd += " " + ev
}
eventsCmd += "\n\n"
fmt.Fprint(self.conn, eventsCmd) // Send command here
if rply, err := self.readHeaders(); err != nil || !strings.Contains(rply, "Reply-Text: +OK") {
fmt.Fprint(fs.conn, eventsCmd) // Send command here
if rply, err := readHeaders(); err != nil || !strings.Contains(rply, "Reply-Text: +OK") {
return errors.New("event error")
}
return nil
}
// Enable filters
func (self *FSock) FilterEvents(filters map[string]string) error {
func FilterEvents(filters map[string]string) error {
if len(filters) == 0 { //Nothing to filter
return nil
}
@@ -199,40 +199,40 @@ func (self *FSock) FilterEvents(filters map[string]string) error {
cmd += " " + hdr + " " + val
}
cmd += "\n\n"
fmt.Fprint(self.conn, cmd)
if rply, err := self.readHeaders(); err != nil || !strings.Contains(rply, "Reply-Text: +OK") {
fmt.Fprint(fs.conn, cmd)
if rply, err := readHeaders(); err != nil || !strings.Contains(rply, "Reply-Text: +OK") {
return errors.New("filter error")
}
return nil
}
// Connect or reconnect
func (self *FSock) Connect(reconnects int) error {
if self.Connected() {
self.Disconnect()
func Connect(reconnects int) error {
if Connected() {
Disconnect()
}
var conErr error
for i := 0; i < reconnects; i++ {
fmt.Println("Attempting FS connect")
self.conn, conErr = net.Dial("tcp", self.fsaddress)
fs.conn, conErr = net.Dial("tcp", fs.fsaddress)
if conErr == nil {
// Connected, init buffer, auth and subscribe to desired events and filters
self.buffer = bufio.NewReaderSize(self.conn, 8192) // reinit buffer
if authChg, err := self.readHeaders(); err != nil || !strings.Contains(authChg, "auth/request") {
fs.buffer = bufio.NewReaderSize(fs.conn, 8192) // reinit buffer
if authChg, err := readHeaders(); err != nil || !strings.Contains(authChg, "auth/request") {
return errors.New("No auth challenge received")
} else if errAuth := self.Auth(); errAuth != nil { // Auth did not succeed
} else if errAuth := Auth(); errAuth != nil { // Auth did not succeed
return errAuth
}
// Subscribe to events handled by event handlers
handledEvs := make([]string, len(self.eventHandlers))
handledEvs := make([]string, len(fs.eventHandlers))
j := 0
for k := range self.eventHandlers {
for k := range fs.eventHandlers {
handledEvs[j] = k
j++
}
if subscribeErr := self.EventsPlain(handledEvs); subscribeErr != nil {
if subscribeErr := EventsPlain(handledEvs); subscribeErr != nil {
return subscribeErr
} else if filterErr := self.FilterEvents(self.eventFilters); filterErr != nil {
} else if filterErr := FilterEvents(fs.eventFilters); filterErr != nil {
return filterErr
}
return nil
@@ -243,13 +243,13 @@ func (self *FSock) Connect(reconnects int) error {
}
// Send API command
func (self *FSock) SendApiCmd(cmdStr string) error {
if !self.Connected() {
func SendApiCmd(cmdStr string) error {
if !Connected() {
return errors.New("Not connected to FS")
}
cmd := fmt.Sprintf("api %s\n\n", cmdStr)
fmt.Fprint(self.conn, cmd)
resEvent := <-self.apiChan
fmt.Fprint(fs.conn, cmd)
resEvent := <-fs.apiChan
if strings.Contains(resEvent, "-ERR") {
return errors.New("Command failed")
}
@@ -257,13 +257,13 @@ func (self *FSock) SendApiCmd(cmdStr string) error {
}
// Reads events from socket
func (self *FSock) ReadEvents() {
func ReadEvents() {
// Read events from buffer, firing them up further
for {
hdr, body, err := self.readEvent()
hdr, body, err := readEvent()
if err != nil {
fmt.Println("WARNING: got error while reading events: ", err.Error())
connErr := self.Connect(3)
connErr := Connect(3)
if connErr != nil {
fmt.Println("FSock reader - cannot connect to FS")
return
@@ -271,10 +271,10 @@ func (self *FSock) ReadEvents() {
continue // Connection reset
}
if strings.Contains(hdr, "api/response") {
self.apiChan <- hdr + body
fs.apiChan <- hdr + body
}
if body != "" { // We got a body, could be event, try dispatching it
self.DispatchEvent(body)
DispatchEvent(body)
}
}
fmt.Println("Exiting ReadEvents")
@@ -282,9 +282,9 @@ func (self *FSock) ReadEvents() {
}
// Dispatch events to handlers in async mode
func (self *FSock) DispatchEvent(event string) {
eventName := HeaderVal(event, "Event-Name")
if handlerFunc, hasHandler := self.eventHandlers[eventName]; hasHandler {
func DispatchEvent(event string) {
eventName := headerVal(event, "Event-Name")
if handlerFunc, hasHandler := fs.eventHandlers[eventName]; hasHandler {
go handlerFunc(event)
}
}

View File

@@ -38,10 +38,10 @@ func TestHeaders(t *testing.T) {
if err != nil {
t.Error("Error creating pype!")
}
fs := FSock{}
fs = &fSock{}
fs.buffer = bufio.NewReader(r)
w.Write([]byte(HEADER))
h, err := fs.readHeaders()
h, err := readHeaders()
if err != nil || h != "Content-Length: 564\nContent-Type: text/event-plain\n" {
t.Error("Error parsing headers: ", h, err)
}
@@ -52,10 +52,10 @@ func TestEvent(t *testing.T) {
if err != nil {
t.Error("Error creating pype!")
}
fs := FSock{}
fs = &fSock{}
fs.buffer = bufio.NewReader(r)
w.Write([]byte(HEADER + BODY))
h, b, err := fs.readEvent()
h, b, err := readEvent()
if err != nil || h != HEADER[:len(HEADER)-1] || len(b) != 564 {
t.Error("Error parsing event: ", h, b, err)
}
@@ -63,8 +63,7 @@ func TestEvent(t *testing.T) {
func BenchmarkHeaderVal(b *testing.B) {
for i := 0; i < b.N; i++ {
HeaderVal(HEADER, "Content-Length")
HeaderVal(BODY, "Event-Date-Loca")
headerVal(HEADER, "Content-Length")
headerVal(BODY, "Event-Date-Loca")
}
}

View File

@@ -20,37 +20,33 @@ package sessionmanager
import (
"bufio"
"errors"
"fmt"
"github.com/cgrates/cgrates/fsock"
"github.com/cgrates/cgrates/rater"
"net"
"strings"
"time"
)
// The freeswitch session manager type holding a buffer for the network connection,
// the active sessions, and a session delegate doing specific actions on every session.
// The freeswitch session manager type holding a buffer for the network connection
// and the active sessions
type FSSessionManager struct {
conn net.Conn
buf *bufio.Reader
sessions []*Session
sessionDelegate *SessionDelegate
loggerDB rater.DataStorage
address, pass string
delayFunc func() int
conn net.Conn
buf *bufio.Reader
sessions []*Session
connector rater.Connector
debitPeriod time.Duration
loggerDB rater.DataStorage
delayFunc func() int
}
func NewFSSessionManager(storage rater.DataStorage) *FSSessionManager {
func NewFSSessionManager(storage rater.DataStorage, connector rater.Connector, debitPeriod time.Duration) *FSSessionManager {
return &FSSessionManager{loggerDB: storage}
}
// Connects to the freeswitch mod_event_socket server and starts
// listening for events in json format.
func (sm *FSSessionManager) Connect(ed *SessionDelegate, address, pass string) error {
if ed == nil {
rater.Logger.Crit("Please provide a non nil SessionDelegate")
return errors.New("nil session delegate")
}
sm.sessionDelegate = ed
func (sm *FSSessionManager) Connect(address, pass string) error {
sm.address = address
sm.pass = pass
if sm.conn != nil {
@@ -67,6 +63,19 @@ func (sm *FSSessionManager) Connect(ed *SessionDelegate, address, pass string) e
fmt.Fprint(conn, fmt.Sprintf("auth %s\n\n", pass))
fmt.Fprint(conn, "event json HEARTBEAT CHANNEL_ANSWER CHANNEL_HANGUP_COMPLETE CHANNEL_PARK\n\n")
fmt.Fprint(conn, "filter Call-Direction inbound\n\n")
handlers := make(map[string]func(string))
handlers["HEARTBEAT"] = func(s string) { fmt.Println(s) } // Example handler
if officer.FS, err = officer.NewFSock(config.FS_SOCK, config.FS_PASWD, 3, handlers); err != nil {
fmt.Println("FreeSWITCH error:", err)
exitChan <- true
return
} else if officer.FS.Connected() {
fmt.Println("Successfully connected to FreeSWITCH")
}
officer.FS.ReadEvents()
exitChan <- true // If we have reached here something went wrong
go func() {
sm.delayFunc = fib()
exitChan := make(chan bool)
@@ -84,7 +93,6 @@ func (sm *FSSessionManager) Connect(ed *SessionDelegate, address, pass string) e
// Reads from freeswitch server buffer until it encounters a '}',
// than it creates an event object and calls the appropriate method
// on the session delegate.
func (sm *FSSessionManager) readNextEvent(exitChan chan bool) (ev Event) {
body, err := sm.buf.ReadString('}')
if err != nil {
@@ -92,7 +100,7 @@ func (sm *FSSessionManager) readNextEvent(exitChan chan bool) (ev Event) {
// wait until a sec
time.Sleep(time.Duration(sm.delayFunc()) * time.Second)
// try to reconnect
err = sm.Connect(sm.sessionDelegate, sm.address, sm.pass)
err = sm.Connect(sm.address, sm.pass)
if err == nil {
rater.Logger.Info("Successfuly reconnected to freeswitch! ")
exitChan <- true
@@ -108,8 +116,6 @@ func (sm *FSSessionManager) readNextEvent(exitChan chan bool) (ev Event) {
sm.OnChannelAnswer(ev)
case HANGUP:
sm.OnChannelHangupComplete(ev)
default:
sm.OnOther(ev)
}
return
}
@@ -137,62 +143,187 @@ func (sm *FSSessionManager) UnparkCall(uuid, call_dest_nb, notify string) {
fmt.Fprint(sm.conn, fmt.Sprintf("api uuid_transfer %s %s\n\n", uuid, call_dest_nb))
}
// Called on freeswitch's hearbeat event
func (sm *FSSessionManager) OnHeartBeat(ev Event) {
if sm.sessionDelegate != nil {
sm.sessionDelegate.OnHeartBeat(ev)
} else {
rater.Logger.Info("♥")
}
rater.Logger.Info("freeswitch ♥")
}
// Called on freeswitch's answer event
func (sm *FSSessionManager) OnChannelPark(ev Event) {
if sm.sessionDelegate != nil {
sm.sessionDelegate.OnChannelPark(ev, sm)
} else {
rater.Logger.Info("park")
rater.Logger.Info("freeswitch park")
startTime, err := ev.GetStartTime(PARK_TIME)
if err != nil {
rater.Logger.Err("Error parsing answer event start time, using time.Now!")
startTime = time.Now()
}
// if there is no account configured leave the call alone
if strings.TrimSpace(ev.GetReqType()) != REQTYPE_PREPAID {
return
}
if ev.MissingParameter() {
sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), MISSING_PARAMETER)
rater.Logger.Err(fmt.Sprintf("Missing parameter for %s", ev.GetUUID()))
return
}
cd := rater.CallDescriptor{
Direction: ev.GetDirection(),
Tenant: ev.GetTenant(),
TOR: ev.GetTOR(),
Subject: ev.GetSubject(),
Account: ev.GetAccount(),
Destination: ev.GetDestination(),
Amount: sm.debitPeriod.Seconds(),
TimeStart: startTime}
var remainingSeconds float64
err = sm.connector.GetMaxSessionTime(cd, &remainingSeconds)
if err != nil {
rater.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err))
sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), SYSTEM_ERROR)
return
}
rater.Logger.Info(fmt.Sprintf("Remaining seconds: %v", remainingSeconds))
if remainingSeconds == 0 {
rater.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey()))
sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), INSUFFICIENT_FUNDS)
return
}
sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), AUTH_OK)
}
// Called on freeswitch's answer event
func (sm *FSSessionManager) OnChannelAnswer(ev Event) {
if sm.sessionDelegate != nil {
s := NewSession(ev, sm)
if s != nil {
sm.sessions = append(sm.sessions, s)
sm.sessionDelegate.OnChannelAnswer(ev, s)
}
} else {
rater.Logger.Info("answer")
}
rater.Logger.Info("freeswitch answer")
}
// Called on freeswitch's hangup event
func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) {
s := sm.GetSession(ev.GetUUID())
if sm.sessionDelegate != nil {
sm.sessionDelegate.OnChannelHangupComplete(ev, s)
s.SaveOperations()
} else {
rater.Logger.Info("HangupComplete")
if ev.GetReqType() == REQTYPE_POSTPAID {
startTime, err := ev.GetStartTime(START_TIME)
if err != nil {
rater.Logger.Crit("Error parsing postpaid call start time from event")
return
}
endTime, err := ev.GetEndTime()
if err != nil {
rater.Logger.Crit("Error parsing postpaid call start time from event")
return
}
cd := rater.CallDescriptor{
Direction: ev.GetDirection(),
Tenant: ev.GetTenant(),
TOR: ev.GetTOR(),
Subject: ev.GetSubject(),
Account: ev.GetAccount(),
Destination: ev.GetDestination(),
TimeStart: startTime,
TimeEnd: endTime,
}
cc := &rater.CallCost{}
err = sm.connector.Debit(cd, cc)
if err != nil {
rater.Logger.Err(fmt.Sprintf("Error making the general debit for postpaid call: %v", ev.GetUUID()))
return
}
s.CallCosts = append(s.CallCosts, cc)
return
}
if s != nil {
s.Close()
if s == nil || len(s.CallCosts) == 0 {
return // why would we have 0 callcosts
}
lastCC := s.CallCosts[len(s.CallCosts)-1]
// put credit back
start := time.Now()
end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd
refoundDuration := end.Sub(start).Seconds()
cost := 0.0
seconds := 0.0
rater.Logger.Info(fmt.Sprintf("Refund duration: %v", refoundDuration))
for i := len(lastCC.Timespans) - 1; i >= 0; i-- {
ts := lastCC.Timespans[i]
tsDuration := ts.GetDuration().Seconds()
if refoundDuration <= tsDuration {
// find procentage
procentage := (refoundDuration * 100) / tsDuration
tmpCost := (procentage * ts.Cost) / 100
ts.Cost -= tmpCost
cost += tmpCost
if ts.MinuteInfo != nil {
// DestinationPrefix and Price take from lastCC and above caclulus
seconds += (procentage * ts.MinuteInfo.Quantity) / 100
}
// set the end time to now
ts.TimeEnd = start
break // do not go to other timespans
} else {
cost += ts.Cost
if ts.MinuteInfo != nil {
seconds += ts.MinuteInfo.Quantity
}
// remove the timestamp entirely
lastCC.Timespans = lastCC.Timespans[:i]
// continue to the next timespan with what is left to refound
refoundDuration -= tsDuration
}
}
if cost > 0 {
cd := &rater.CallDescriptor{
Direction: lastCC.Direction,
Tenant: lastCC.Tenant,
TOR: lastCC.TOR,
Subject: lastCC.Subject,
Account: lastCC.Account,
Destination: lastCC.Destination,
Amount: -cost,
}
var response float64
err := sm.connector.DebitCents(*cd, &response)
if err != nil {
rater.Logger.Err(fmt.Sprintf("Debit cents failed: %v", err))
}
}
if seconds > 0 {
cd := &rater.CallDescriptor{
Direction: lastCC.Direction,
TOR: lastCC.TOR,
Tenant: lastCC.Tenant,
Subject: lastCC.Subject,
Account: lastCC.Account,
Destination: lastCC.Destination,
Amount: -seconds,
}
var response float64
err := sm.connector.DebitSeconds(*cd, &response)
if err != nil {
rater.Logger.Err(fmt.Sprintf("Debit seconds failed: %v", err))
}
}
lastCC.Cost -= cost
rater.Logger.Info(fmt.Sprintf("Rambursed %v cents, %v seconds", cost, seconds))
}
// Called on freeswitch's events not processed by the session manger,
// for logging purposes (maybe).
func (sm *FSSessionManager) OnOther(ev Event) {
//log.Printf("Other event: %s", ev.GetName())
func (sm *FSSessionManager) LoopAction(s *Session, cd *rater.CallDescriptor) {
cc := &rater.CallCost{}
cd.Amount = sm.debitPeriod.Seconds()
err := sm.connector.MaxDebit(*cd, cc)
if err != nil {
rater.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err))
// disconnect session
s.sessionManager.DisconnectSession(s, SYSTEM_ERROR)
}
nbts := len(cc.Timespans)
remainingSeconds := 0.0
rater.Logger.Debug(fmt.Sprintf("Result of MaxDebit call: %v", cc))
if nbts > 0 {
remainingSeconds = cc.Timespans[nbts-1].TimeEnd.Sub(cc.Timespans[0].TimeStart).Seconds()
}
if remainingSeconds == 0 || err != nil {
rater.Logger.Info(fmt.Sprintf("No credit left: Disconnect %v", s))
s.Disconnect()
return
}
s.CallCosts = append(s.CallCosts, cc)
}
func (sm *FSSessionManager) GetSessionDelegate() *SessionDelegate {
return sm.sessionDelegate
func (sm *FSSessionManager) GetDebitPeriod() time.Duration {
return sm.debitPeriod
}
func (sm *FSSessionManager) GetDbLogger() rater.DataStorage {
return sm.loggerDB
}

View File

@@ -84,10 +84,9 @@ func (s *Session) startDebitLoop() {
if nextCd.TimeEnd != s.callDescriptor.TimeEnd { // first time use the session start time
nextCd.TimeStart = time.Now()
}
sd := s.sessionManager.GetSessionDelegate()
nextCd.TimeEnd = time.Now().Add(sd.GetDebitPeriod())
sd.LoopAction(s, &nextCd)
time.Sleep(sd.GetDebitPeriod())
nextCd.TimeEnd = time.Now().Add(s.sessionManager.GetDebitPeriod())
s.sessionManager.LoopAction(s, &nextCd)
time.Sleep(s.sessionManager.GetDebitPeriod())
}
}

View File

@@ -1,213 +0,0 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2012 Radu Ioan Fericean
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package sessionmanager
import (
"fmt"
"github.com/cgrates/cgrates/rater"
"strings"
"time"
)
// Sample SessionDelegate calling the timespans methods through the RPC interface
type SessionDelegate struct {
Connector rater.Connector
DebitPeriod time.Duration
}
func (rsd *SessionDelegate) OnHeartBeat(ev Event) {
rater.Logger.Info("freeswitch ♥")
}
func (rsd *SessionDelegate) OnChannelPark(ev Event, sm SessionManager) {
rater.Logger.Info("freeswitch park")
startTime, err := ev.GetStartTime(PARK_TIME)
if err != nil {
rater.Logger.Err("Error parsing answer event start time, using time.Now!")
startTime = time.Now()
}
// if there is no account configured leave the call alone
if strings.TrimSpace(ev.GetReqType()) != REQTYPE_PREPAID {
return
}
if ev.MissingParameter() {
sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), MISSING_PARAMETER)
rater.Logger.Err(fmt.Sprintf("Missing parameter for %s", ev.GetUUID()))
return
}
cd := rater.CallDescriptor{
Direction: ev.GetDirection(),
Tenant: ev.GetTenant(),
TOR: ev.GetTOR(),
Subject: ev.GetSubject(),
Account: ev.GetAccount(),
Destination: ev.GetDestination(),
Amount: rsd.DebitPeriod.Seconds(),
TimeStart: startTime}
var remainingSeconds float64
err = rsd.Connector.GetMaxSessionTime(cd, &remainingSeconds)
if err != nil {
rater.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err))
sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), SYSTEM_ERROR)
return
}
rater.Logger.Info(fmt.Sprintf("Remaining seconds: %v", remainingSeconds))
if remainingSeconds == 0 {
rater.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey()))
sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), INSUFFICIENT_FUNDS)
return
}
sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), AUTH_OK)
}
func (rsd *SessionDelegate) OnChannelAnswer(ev Event, s *Session) {
rater.Logger.Info("freeswitch answer")
}
func (rsd *SessionDelegate) OnChannelHangupComplete(ev Event, s *Session) {
if ev.GetReqType() == REQTYPE_POSTPAID {
startTime, err := ev.GetStartTime(START_TIME)
if err != nil {
rater.Logger.Crit("Error parsing postpaid call start time from event")
return
}
endTime, err := ev.GetEndTime()
if err != nil {
rater.Logger.Crit("Error parsing postpaid call start time from event")
return
}
cd := rater.CallDescriptor{
Direction: ev.GetDirection(),
Tenant: ev.GetTenant(),
TOR: ev.GetTOR(),
Subject: ev.GetSubject(),
Account: ev.GetAccount(),
Destination: ev.GetDestination(),
TimeStart: startTime,
TimeEnd: endTime,
}
cc := &rater.CallCost{}
err = rsd.Connector.Debit(cd, cc)
if err != nil {
rater.Logger.Err(fmt.Sprintf("Error making the general debit for postpaid call: %v", ev.GetUUID()))
return
}
s.CallCosts = append(s.CallCosts, cc)
return
}
if s == nil || len(s.CallCosts) == 0 {
return // why would we have 0 callcosts
}
lastCC := s.CallCosts[len(s.CallCosts)-1]
// put credit back
start := time.Now()
end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd
refoundDuration := end.Sub(start).Seconds()
cost := 0.0
seconds := 0.0
rater.Logger.Info(fmt.Sprintf("Refund duration: %v", refoundDuration))
for i := len(lastCC.Timespans) - 1; i >= 0; i-- {
ts := lastCC.Timespans[i]
tsDuration := ts.GetDuration().Seconds()
if refoundDuration <= tsDuration {
// find procentage
procentage := (refoundDuration * 100) / tsDuration
tmpCost := (procentage * ts.Cost) / 100
ts.Cost -= tmpCost
cost += tmpCost
if ts.MinuteInfo != nil {
// DestinationPrefix and Price take from lastCC and above caclulus
seconds += (procentage * ts.MinuteInfo.Quantity) / 100
}
// set the end time to now
ts.TimeEnd = start
break // do not go to other timespans
} else {
cost += ts.Cost
if ts.MinuteInfo != nil {
seconds += ts.MinuteInfo.Quantity
}
// remove the timestamp entirely
lastCC.Timespans = lastCC.Timespans[:i]
// continue to the next timespan with what is left to refound
refoundDuration -= tsDuration
}
}
if cost > 0 {
cd := &rater.CallDescriptor{
Direction: lastCC.Direction,
Tenant: lastCC.Tenant,
TOR: lastCC.TOR,
Subject: lastCC.Subject,
Account: lastCC.Account,
Destination: lastCC.Destination,
Amount: -cost,
}
var response float64
err := rsd.Connector.DebitCents(*cd, &response)
if err != nil {
rater.Logger.Err(fmt.Sprintf("Debit cents failed: %v", err))
}
}
if seconds > 0 {
cd := &rater.CallDescriptor{
Direction: lastCC.Direction,
TOR: lastCC.TOR,
Tenant: lastCC.Tenant,
Subject: lastCC.Subject,
Account: lastCC.Account,
Destination: lastCC.Destination,
Amount: -seconds,
}
var response float64
err := rsd.Connector.DebitSeconds(*cd, &response)
if err != nil {
rater.Logger.Err(fmt.Sprintf("Debit seconds failed: %v", err))
}
}
lastCC.Cost -= cost
rater.Logger.Info(fmt.Sprintf("Rambursed %v cents, %v seconds", cost, seconds))
}
func (rsd *SessionDelegate) LoopAction(s *Session, cd *rater.CallDescriptor) {
cc := &rater.CallCost{}
cd.Amount = rsd.DebitPeriod.Seconds()
err := rsd.Connector.MaxDebit(*cd, cc)
if err != nil {
rater.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err))
// disconnect session
s.sessionManager.DisconnectSession(s, SYSTEM_ERROR)
}
nbts := len(cc.Timespans)
remainingSeconds := 0.0
rater.Logger.Debug(fmt.Sprintf("Result of MaxDebit call: %v", cc))
if nbts > 0 {
remainingSeconds = cc.Timespans[nbts-1].TimeEnd.Sub(cc.Timespans[0].TimeStart).Seconds()
}
if remainingSeconds == 0 || err != nil {
rater.Logger.Info(fmt.Sprintf("No credit left: Disconnect %v", s))
s.Disconnect()
return
}
s.CallCosts = append(s.CallCosts, cc)
}
func (rsd *SessionDelegate) GetDebitPeriod() time.Duration {
return rsd.DebitPeriod
}

View File

@@ -20,11 +20,12 @@ package sessionmanager
import (
"github.com/cgrates/cgrates/rater"
"time"
)
type SessionManager interface {
DisconnectSession(*Session, string)
UnparkCall(string, string, string)
GetSessionDelegate() *SessionDelegate
LoopAction(*Session, *rater.CallDescriptor)
GetDebitPeriod() time.Duration
GetDbLogger() rater.DataStorage
}