From 0c23c3b7c97d9729bbba1ff8fd5249eadb540456 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Sat, 6 Oct 2012 12:58:43 +0300 Subject: [PATCH] hidden fsock inside package --- cmd/cgr-rater/cgr-rater.go | 4 +- fsock/fsock.go | 120 +++++++------- fsock/fsock_test.go | 13 +- sessionmanager/fssessionmanager.go | 247 ++++++++++++++++++++++------- sessionmanager/session.go | 7 +- sessionmanager/sessiondelegate.go | 213 ------------------------- sessionmanager/sessionmanager.go | 5 +- 7 files changed, 263 insertions(+), 346 deletions(-) delete mode 100644 sessionmanager/sessiondelegate.go diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index 30a826a8e..e0efae7a6 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -255,9 +255,9 @@ func startSessionManager(responder *rater.Responder, loggerDb rater.DataStorage) } switch sm_switch_type { case FS: - sm := sessionmanager.NewFSSessionManager(loggerDb) dp, _ := time.ParseDuration(fmt.Sprintf("%vs", sm_debit_period)) - sm.Connect(&sessionmanager.SessionDelegate{Connector: connector, DebitPeriod: dp}, freeswitch_server, freeswitch_pass) + sm := sessionmanager.NewFSSessionManager(loggerDb, connectorm, dp) + sm.Connect(freeswitch_server, freeswitch_pass) default: rater.Logger.Err(fmt.Sprintf("Cannot start session manger of type: %s!", sm_switch_type)) exitChan <- true diff --git a/fsock/fsock.go b/fsock/fsock.go index dc188ef3a..ebb41196f 100644 --- a/fsock/fsock.go +++ b/fsock/fsock.go @@ -14,10 +14,10 @@ import ( "time" ) -var FS *FSock // Used to share FS connection via package globals +var fs *fSock // Used to share FS connection via package globals // Extracts value of a header from anywhere in content string -func HeaderVal(hdrs, hdr string) string { +func headerVal(hdrs, hdr string) string { var hdrSIdx, hdrEIdx int if hdrSIdx = strings.Index(hdrs, hdr); hdrSIdx == -1 { return "" @@ -32,7 +32,7 @@ func HeaderVal(hdrs, hdr string) string { } // FS event header values are urlencoded. Use this to decode them. On error, use original value -func UrlDecode(hdrVal string) string { +func urlDecode(hdrVal string) string { if valUnescaped, errUnescaping := url.QueryUnescape(hdrVal); errUnescaping == nil { hdrVal = valUnescaped } @@ -40,7 +40,7 @@ func UrlDecode(hdrVal string) string { } // Binary string search in slice -func IsSliceMember(ss []string, s string) bool { +func isSliceMember(ss []string, s string) bool { sort.Strings(ss) if i := sort.SearchStrings(ss, s); i < len(ss) && ss[i] == s { return true @@ -49,7 +49,7 @@ func IsSliceMember(ss []string, s string) bool { } // Convert fseventStr into fseventMap -func FSEventStrToMap(fsevstr string, headers []string) map[string]string { +func fsEventStrToMap(fsevstr string, headers []string) map[string]string { fsevent := make(map[string]string) filtered := false if len(headers) != 0 { @@ -57,29 +57,29 @@ func FSEventStrToMap(fsevstr string, headers []string) map[string]string { } for _, strLn := range strings.Split(fsevstr, "\n") { if hdrVal := strings.SplitN(strLn, ": ", 2); len(hdrVal) == 2 { - if filtered && IsSliceMember(headers, hdrVal[0]) { + if filtered && isSliceMember(headers, hdrVal[0]) { continue // Loop again since we only work on filtered fields } - fsevent[hdrVal[0]] = UrlDecode(strings.TrimSpace(strings.TrimRight(hdrVal[1], "\n"))) + fsevent[hdrVal[0]] = urlDecode(strings.TrimSpace(strings.TrimRight(hdrVal[1], "\n"))) } } return fsevent } // Connects to FS and starts buffering input -func NewFSock(fsaddr, fspaswd string, reconnects int, eventHandlers map[string]func(string)) (*FSock, error) { +func NewFSock(fsaddr, fspaswd string, reconnects int, eventHandlers map[string]func(string)) error { eventFilters := make(map[string]string) - fsock := FSock{fsaddress: fsaddr, fspaswd: fspaswd, eventHandlers: eventHandlers, eventFilters: eventFilters} - fsock.apiChan = make(chan string) // Init apichan so we can use it to pass api replies - errConn := fsock.Connect(reconnects) + fs = &fSock{fsaddress: fsaddr, fspaswd: fspaswd, eventHandlers: eventHandlers, eventFilters: eventFilters} + fs.apiChan = make(chan string) // Init apichan so we can use it to pass api replies + errConn := Connect(reconnects) if errConn != nil { - return nil, errConn + return errConn } - return &fsock, nil + return nil } // Connection to FreeSWITCH Socket -type FSock struct { +type fSock struct { conn net.Conn buffer *bufio.Reader fsaddress, @@ -90,11 +90,11 @@ type FSock struct { } // Reads headers until delimiter reached -func (self *FSock) readHeaders() (s string, err error) { +func readHeaders() (s string, err error) { bytesRead := make([]byte, 0) var readLine []byte for { - readLine, err = self.buffer.ReadBytes('\n') + readLine, err = fs.buffer.ReadBytes('\n') if err != nil { return } @@ -108,9 +108,9 @@ func (self *FSock) readHeaders() (s string, err error) { } // Reads the body from buffer, ln is given by content-length of headers -func (self *FSock) readBody(ln int) (s string, err error) { +func readBody(ln int) (s string, err error) { bytesRead := make([]byte, ln) - n, err := self.buffer.Read(bytesRead) + n, err := fs.buffer.Read(bytesRead) if err != nil { return } @@ -122,46 +122,46 @@ func (self *FSock) readBody(ln int) (s string, err error) { } // Event is made out of headers and body (if present) -func (self *FSock) readEvent() (string, string, error) { +func readEvent() (string, string, error) { var hdrs, body string var cl int var err error - if hdrs, err = self.readHeaders(); err != nil { + if hdrs, err = readHeaders(); err != nil { return "", "", err } if !strings.Contains(hdrs, "Content-Length") { //No body return hdrs, "", nil } - clStr := HeaderVal(hdrs, "Content-Length") + clStr := headerVal(hdrs, "Content-Length") if cl, err = strconv.Atoi(clStr); err != nil { return "", "", errors.New("Cannot extract content length") } - if body, err = self.readBody(cl); err != nil { + if body, err = readBody(cl); err != nil { return "", "", errors.New("Cannot extract body") } return hdrs, body, nil } // Checks if socket connected. Can be extended with pings -func (self *FSock) Connected() bool { - if self.conn == nil { +func Connected() bool { + if fs.conn == nil { return false } return true } // Disconnects from socket -func (self *FSock) Disconnect() { - if self.conn != nil { - self.conn.Close() +func Disconnect() { + if fs.conn != nil { + fs.conn.Close() } } // Auth to FS -func (self *FSock) Auth() error { - authCmd := fmt.Sprintf("auth %s\n\n", self.fspaswd) - fmt.Fprint(self.conn, authCmd) - if rply, err := self.readHeaders(); err != nil || !strings.Contains(rply, "Reply-Text: +OK accepted") { +func Auth() error { + authCmd := fmt.Sprintf("auth %s\n\n", fs.fspaswd) + fmt.Fprint(fs.conn, authCmd) + if rply, err := readHeaders(); err != nil || !strings.Contains(rply, "Reply-Text: +OK accepted") { fmt.Println("Got reply to auth:", rply) return errors.New("auth error") } @@ -169,7 +169,7 @@ func (self *FSock) Auth() error { } // Subscribe to events -func (self *FSock) EventsPlain(events []string) error { +func EventsPlain(events []string) error { if len(events) == 0 { return nil } @@ -182,15 +182,15 @@ func (self *FSock) EventsPlain(events []string) error { eventsCmd += " " + ev } eventsCmd += "\n\n" - fmt.Fprint(self.conn, eventsCmd) // Send command here - if rply, err := self.readHeaders(); err != nil || !strings.Contains(rply, "Reply-Text: +OK") { + fmt.Fprint(fs.conn, eventsCmd) // Send command here + if rply, err := readHeaders(); err != nil || !strings.Contains(rply, "Reply-Text: +OK") { return errors.New("event error") } return nil } // Enable filters -func (self *FSock) FilterEvents(filters map[string]string) error { +func FilterEvents(filters map[string]string) error { if len(filters) == 0 { //Nothing to filter return nil } @@ -199,40 +199,40 @@ func (self *FSock) FilterEvents(filters map[string]string) error { cmd += " " + hdr + " " + val } cmd += "\n\n" - fmt.Fprint(self.conn, cmd) - if rply, err := self.readHeaders(); err != nil || !strings.Contains(rply, "Reply-Text: +OK") { + fmt.Fprint(fs.conn, cmd) + if rply, err := readHeaders(); err != nil || !strings.Contains(rply, "Reply-Text: +OK") { return errors.New("filter error") } return nil } // Connect or reconnect -func (self *FSock) Connect(reconnects int) error { - if self.Connected() { - self.Disconnect() +func Connect(reconnects int) error { + if Connected() { + Disconnect() } var conErr error for i := 0; i < reconnects; i++ { fmt.Println("Attempting FS connect") - self.conn, conErr = net.Dial("tcp", self.fsaddress) + fs.conn, conErr = net.Dial("tcp", fs.fsaddress) if conErr == nil { // Connected, init buffer, auth and subscribe to desired events and filters - self.buffer = bufio.NewReaderSize(self.conn, 8192) // reinit buffer - if authChg, err := self.readHeaders(); err != nil || !strings.Contains(authChg, "auth/request") { + fs.buffer = bufio.NewReaderSize(fs.conn, 8192) // reinit buffer + if authChg, err := readHeaders(); err != nil || !strings.Contains(authChg, "auth/request") { return errors.New("No auth challenge received") - } else if errAuth := self.Auth(); errAuth != nil { // Auth did not succeed + } else if errAuth := Auth(); errAuth != nil { // Auth did not succeed return errAuth } // Subscribe to events handled by event handlers - handledEvs := make([]string, len(self.eventHandlers)) + handledEvs := make([]string, len(fs.eventHandlers)) j := 0 - for k := range self.eventHandlers { + for k := range fs.eventHandlers { handledEvs[j] = k j++ } - if subscribeErr := self.EventsPlain(handledEvs); subscribeErr != nil { + if subscribeErr := EventsPlain(handledEvs); subscribeErr != nil { return subscribeErr - } else if filterErr := self.FilterEvents(self.eventFilters); filterErr != nil { + } else if filterErr := FilterEvents(fs.eventFilters); filterErr != nil { return filterErr } return nil @@ -243,13 +243,13 @@ func (self *FSock) Connect(reconnects int) error { } // Send API command -func (self *FSock) SendApiCmd(cmdStr string) error { - if !self.Connected() { +func SendApiCmd(cmdStr string) error { + if !Connected() { return errors.New("Not connected to FS") } cmd := fmt.Sprintf("api %s\n\n", cmdStr) - fmt.Fprint(self.conn, cmd) - resEvent := <-self.apiChan + fmt.Fprint(fs.conn, cmd) + resEvent := <-fs.apiChan if strings.Contains(resEvent, "-ERR") { return errors.New("Command failed") } @@ -257,13 +257,13 @@ func (self *FSock) SendApiCmd(cmdStr string) error { } // Reads events from socket -func (self *FSock) ReadEvents() { +func ReadEvents() { // Read events from buffer, firing them up further for { - hdr, body, err := self.readEvent() + hdr, body, err := readEvent() if err != nil { fmt.Println("WARNING: got error while reading events: ", err.Error()) - connErr := self.Connect(3) + connErr := Connect(3) if connErr != nil { fmt.Println("FSock reader - cannot connect to FS") return @@ -271,10 +271,10 @@ func (self *FSock) ReadEvents() { continue // Connection reset } if strings.Contains(hdr, "api/response") { - self.apiChan <- hdr + body + fs.apiChan <- hdr + body } if body != "" { // We got a body, could be event, try dispatching it - self.DispatchEvent(body) + DispatchEvent(body) } } fmt.Println("Exiting ReadEvents") @@ -282,9 +282,9 @@ func (self *FSock) ReadEvents() { } // Dispatch events to handlers in async mode -func (self *FSock) DispatchEvent(event string) { - eventName := HeaderVal(event, "Event-Name") - if handlerFunc, hasHandler := self.eventHandlers[eventName]; hasHandler { +func DispatchEvent(event string) { + eventName := headerVal(event, "Event-Name") + if handlerFunc, hasHandler := fs.eventHandlers[eventName]; hasHandler { go handlerFunc(event) } } diff --git a/fsock/fsock_test.go b/fsock/fsock_test.go index f48f26087..c9fedca0c 100644 --- a/fsock/fsock_test.go +++ b/fsock/fsock_test.go @@ -38,10 +38,10 @@ func TestHeaders(t *testing.T) { if err != nil { t.Error("Error creating pype!") } - fs := FSock{} + fs = &fSock{} fs.buffer = bufio.NewReader(r) w.Write([]byte(HEADER)) - h, err := fs.readHeaders() + h, err := readHeaders() if err != nil || h != "Content-Length: 564\nContent-Type: text/event-plain\n" { t.Error("Error parsing headers: ", h, err) } @@ -52,10 +52,10 @@ func TestEvent(t *testing.T) { if err != nil { t.Error("Error creating pype!") } - fs := FSock{} + fs = &fSock{} fs.buffer = bufio.NewReader(r) w.Write([]byte(HEADER + BODY)) - h, b, err := fs.readEvent() + h, b, err := readEvent() if err != nil || h != HEADER[:len(HEADER)-1] || len(b) != 564 { t.Error("Error parsing event: ", h, b, err) } @@ -63,8 +63,7 @@ func TestEvent(t *testing.T) { func BenchmarkHeaderVal(b *testing.B) { for i := 0; i < b.N; i++ { - HeaderVal(HEADER, "Content-Length") - HeaderVal(BODY, "Event-Date-Loca") + headerVal(HEADER, "Content-Length") + headerVal(BODY, "Event-Date-Loca") } } - diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index ffa84a63c..7fce1b14e 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -20,37 +20,33 @@ package sessionmanager import ( "bufio" - "errors" "fmt" + "github.com/cgrates/cgrates/fsock" "github.com/cgrates/cgrates/rater" "net" + "strings" "time" ) -// The freeswitch session manager type holding a buffer for the network connection, -// the active sessions, and a session delegate doing specific actions on every session. +// The freeswitch session manager type holding a buffer for the network connection +// and the active sessions type FSSessionManager struct { - conn net.Conn - buf *bufio.Reader - sessions []*Session - sessionDelegate *SessionDelegate - loggerDB rater.DataStorage - address, pass string - delayFunc func() int + conn net.Conn + buf *bufio.Reader + sessions []*Session + connector rater.Connector + debitPeriod time.Duration + loggerDB rater.DataStorage + delayFunc func() int } -func NewFSSessionManager(storage rater.DataStorage) *FSSessionManager { +func NewFSSessionManager(storage rater.DataStorage, connector rater.Connector, debitPeriod time.Duration) *FSSessionManager { return &FSSessionManager{loggerDB: storage} } // Connects to the freeswitch mod_event_socket server and starts // listening for events in json format. -func (sm *FSSessionManager) Connect(ed *SessionDelegate, address, pass string) error { - if ed == nil { - rater.Logger.Crit("Please provide a non nil SessionDelegate") - return errors.New("nil session delegate") - } - sm.sessionDelegate = ed +func (sm *FSSessionManager) Connect(address, pass string) error { sm.address = address sm.pass = pass if sm.conn != nil { @@ -67,6 +63,19 @@ func (sm *FSSessionManager) Connect(ed *SessionDelegate, address, pass string) e fmt.Fprint(conn, fmt.Sprintf("auth %s\n\n", pass)) fmt.Fprint(conn, "event json HEARTBEAT CHANNEL_ANSWER CHANNEL_HANGUP_COMPLETE CHANNEL_PARK\n\n") fmt.Fprint(conn, "filter Call-Direction inbound\n\n") + + handlers := make(map[string]func(string)) + handlers["HEARTBEAT"] = func(s string) { fmt.Println(s) } // Example handler + if officer.FS, err = officer.NewFSock(config.FS_SOCK, config.FS_PASWD, 3, handlers); err != nil { + fmt.Println("FreeSWITCH error:", err) + exitChan <- true + return + } else if officer.FS.Connected() { + fmt.Println("Successfully connected to FreeSWITCH") + } + officer.FS.ReadEvents() + exitChan <- true // If we have reached here something went wrong + go func() { sm.delayFunc = fib() exitChan := make(chan bool) @@ -84,7 +93,6 @@ func (sm *FSSessionManager) Connect(ed *SessionDelegate, address, pass string) e // Reads from freeswitch server buffer until it encounters a '}', // than it creates an event object and calls the appropriate method -// on the session delegate. func (sm *FSSessionManager) readNextEvent(exitChan chan bool) (ev Event) { body, err := sm.buf.ReadString('}') if err != nil { @@ -92,7 +100,7 @@ func (sm *FSSessionManager) readNextEvent(exitChan chan bool) (ev Event) { // wait until a sec time.Sleep(time.Duration(sm.delayFunc()) * time.Second) // try to reconnect - err = sm.Connect(sm.sessionDelegate, sm.address, sm.pass) + err = sm.Connect(sm.address, sm.pass) if err == nil { rater.Logger.Info("Successfuly reconnected to freeswitch! ") exitChan <- true @@ -108,8 +116,6 @@ func (sm *FSSessionManager) readNextEvent(exitChan chan bool) (ev Event) { sm.OnChannelAnswer(ev) case HANGUP: sm.OnChannelHangupComplete(ev) - default: - sm.OnOther(ev) } return } @@ -137,62 +143,187 @@ func (sm *FSSessionManager) UnparkCall(uuid, call_dest_nb, notify string) { fmt.Fprint(sm.conn, fmt.Sprintf("api uuid_transfer %s %s\n\n", uuid, call_dest_nb)) } -// Called on freeswitch's hearbeat event func (sm *FSSessionManager) OnHeartBeat(ev Event) { - if sm.sessionDelegate != nil { - sm.sessionDelegate.OnHeartBeat(ev) - } else { - rater.Logger.Info("♥") - } + rater.Logger.Info("freeswitch ♥") } -// Called on freeswitch's answer event func (sm *FSSessionManager) OnChannelPark(ev Event) { - if sm.sessionDelegate != nil { - sm.sessionDelegate.OnChannelPark(ev, sm) - } else { - rater.Logger.Info("park") + rater.Logger.Info("freeswitch park") + startTime, err := ev.GetStartTime(PARK_TIME) + if err != nil { + rater.Logger.Err("Error parsing answer event start time, using time.Now!") + startTime = time.Now() } + // if there is no account configured leave the call alone + if strings.TrimSpace(ev.GetReqType()) != REQTYPE_PREPAID { + return + } + if ev.MissingParameter() { + sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), MISSING_PARAMETER) + rater.Logger.Err(fmt.Sprintf("Missing parameter for %s", ev.GetUUID())) + return + } + cd := rater.CallDescriptor{ + Direction: ev.GetDirection(), + Tenant: ev.GetTenant(), + TOR: ev.GetTOR(), + Subject: ev.GetSubject(), + Account: ev.GetAccount(), + Destination: ev.GetDestination(), + Amount: sm.debitPeriod.Seconds(), + TimeStart: startTime} + var remainingSeconds float64 + err = sm.connector.GetMaxSessionTime(cd, &remainingSeconds) + if err != nil { + rater.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err)) + sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), SYSTEM_ERROR) + return + } + rater.Logger.Info(fmt.Sprintf("Remaining seconds: %v", remainingSeconds)) + if remainingSeconds == 0 { + rater.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey())) + sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), INSUFFICIENT_FUNDS) + return + } + sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), AUTH_OK) } -// Called on freeswitch's answer event func (sm *FSSessionManager) OnChannelAnswer(ev Event) { - if sm.sessionDelegate != nil { - s := NewSession(ev, sm) - if s != nil { - sm.sessions = append(sm.sessions, s) - sm.sessionDelegate.OnChannelAnswer(ev, s) - } - } else { - rater.Logger.Info("answer") - } + rater.Logger.Info("freeswitch answer") } -// Called on freeswitch's hangup event func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { s := sm.GetSession(ev.GetUUID()) - if sm.sessionDelegate != nil { - sm.sessionDelegate.OnChannelHangupComplete(ev, s) - s.SaveOperations() - } else { - rater.Logger.Info("HangupComplete") + if ev.GetReqType() == REQTYPE_POSTPAID { + startTime, err := ev.GetStartTime(START_TIME) + if err != nil { + rater.Logger.Crit("Error parsing postpaid call start time from event") + return + } + endTime, err := ev.GetEndTime() + if err != nil { + rater.Logger.Crit("Error parsing postpaid call start time from event") + return + } + cd := rater.CallDescriptor{ + Direction: ev.GetDirection(), + Tenant: ev.GetTenant(), + TOR: ev.GetTOR(), + Subject: ev.GetSubject(), + Account: ev.GetAccount(), + Destination: ev.GetDestination(), + TimeStart: startTime, + TimeEnd: endTime, + } + cc := &rater.CallCost{} + err = sm.connector.Debit(cd, cc) + if err != nil { + rater.Logger.Err(fmt.Sprintf("Error making the general debit for postpaid call: %v", ev.GetUUID())) + return + } + s.CallCosts = append(s.CallCosts, cc) + return } - if s != nil { - s.Close() + if s == nil || len(s.CallCosts) == 0 { + return // why would we have 0 callcosts } + lastCC := s.CallCosts[len(s.CallCosts)-1] + // put credit back + start := time.Now() + end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd + refoundDuration := end.Sub(start).Seconds() + cost := 0.0 + seconds := 0.0 + rater.Logger.Info(fmt.Sprintf("Refund duration: %v", refoundDuration)) + for i := len(lastCC.Timespans) - 1; i >= 0; i-- { + ts := lastCC.Timespans[i] + tsDuration := ts.GetDuration().Seconds() + if refoundDuration <= tsDuration { + // find procentage + procentage := (refoundDuration * 100) / tsDuration + tmpCost := (procentage * ts.Cost) / 100 + ts.Cost -= tmpCost + cost += tmpCost + if ts.MinuteInfo != nil { + // DestinationPrefix and Price take from lastCC and above caclulus + seconds += (procentage * ts.MinuteInfo.Quantity) / 100 + } + // set the end time to now + ts.TimeEnd = start + break // do not go to other timespans + } else { + cost += ts.Cost + if ts.MinuteInfo != nil { + seconds += ts.MinuteInfo.Quantity + } + // remove the timestamp entirely + lastCC.Timespans = lastCC.Timespans[:i] + // continue to the next timespan with what is left to refound + refoundDuration -= tsDuration + } + } + if cost > 0 { + cd := &rater.CallDescriptor{ + Direction: lastCC.Direction, + Tenant: lastCC.Tenant, + TOR: lastCC.TOR, + Subject: lastCC.Subject, + Account: lastCC.Account, + Destination: lastCC.Destination, + Amount: -cost, + } + var response float64 + err := sm.connector.DebitCents(*cd, &response) + if err != nil { + rater.Logger.Err(fmt.Sprintf("Debit cents failed: %v", err)) + } + } + if seconds > 0 { + cd := &rater.CallDescriptor{ + Direction: lastCC.Direction, + TOR: lastCC.TOR, + Tenant: lastCC.Tenant, + Subject: lastCC.Subject, + Account: lastCC.Account, + Destination: lastCC.Destination, + Amount: -seconds, + } + var response float64 + err := sm.connector.DebitSeconds(*cd, &response) + if err != nil { + rater.Logger.Err(fmt.Sprintf("Debit seconds failed: %v", err)) + } + } + lastCC.Cost -= cost + rater.Logger.Info(fmt.Sprintf("Rambursed %v cents, %v seconds", cost, seconds)) } -// Called on freeswitch's events not processed by the session manger, -// for logging purposes (maybe). -func (sm *FSSessionManager) OnOther(ev Event) { - //log.Printf("Other event: %s", ev.GetName()) +func (sm *FSSessionManager) LoopAction(s *Session, cd *rater.CallDescriptor) { + cc := &rater.CallCost{} + cd.Amount = sm.debitPeriod.Seconds() + err := sm.connector.MaxDebit(*cd, cc) + if err != nil { + rater.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err)) + // disconnect session + s.sessionManager.DisconnectSession(s, SYSTEM_ERROR) + } + nbts := len(cc.Timespans) + remainingSeconds := 0.0 + rater.Logger.Debug(fmt.Sprintf("Result of MaxDebit call: %v", cc)) + if nbts > 0 { + remainingSeconds = cc.Timespans[nbts-1].TimeEnd.Sub(cc.Timespans[0].TimeStart).Seconds() + } + if remainingSeconds == 0 || err != nil { + rater.Logger.Info(fmt.Sprintf("No credit left: Disconnect %v", s)) + s.Disconnect() + return + } + s.CallCosts = append(s.CallCosts, cc) } - -func (sm *FSSessionManager) GetSessionDelegate() *SessionDelegate { - return sm.sessionDelegate +func (sm *FSSessionManager) GetDebitPeriod() time.Duration { + return sm.debitPeriod } - func (sm *FSSessionManager) GetDbLogger() rater.DataStorage { return sm.loggerDB } diff --git a/sessionmanager/session.go b/sessionmanager/session.go index c2e1b6461..5ffc5112e 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -84,10 +84,9 @@ func (s *Session) startDebitLoop() { if nextCd.TimeEnd != s.callDescriptor.TimeEnd { // first time use the session start time nextCd.TimeStart = time.Now() } - sd := s.sessionManager.GetSessionDelegate() - nextCd.TimeEnd = time.Now().Add(sd.GetDebitPeriod()) - sd.LoopAction(s, &nextCd) - time.Sleep(sd.GetDebitPeriod()) + nextCd.TimeEnd = time.Now().Add(s.sessionManager.GetDebitPeriod()) + s.sessionManager.LoopAction(s, &nextCd) + time.Sleep(s.sessionManager.GetDebitPeriod()) } } diff --git a/sessionmanager/sessiondelegate.go b/sessionmanager/sessiondelegate.go deleted file mode 100644 index 5a733f65f..000000000 --- a/sessionmanager/sessiondelegate.go +++ /dev/null @@ -1,213 +0,0 @@ -/* -Rating system designed to be used in VoIP Carriers World -Copyright (C) 2012 Radu Ioan Fericean - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package sessionmanager - -import ( - "fmt" - "github.com/cgrates/cgrates/rater" - "strings" - "time" -) - -// Sample SessionDelegate calling the timespans methods through the RPC interface -type SessionDelegate struct { - Connector rater.Connector - DebitPeriod time.Duration -} - -func (rsd *SessionDelegate) OnHeartBeat(ev Event) { - rater.Logger.Info("freeswitch ♥") -} - -func (rsd *SessionDelegate) OnChannelPark(ev Event, sm SessionManager) { - rater.Logger.Info("freeswitch park") - startTime, err := ev.GetStartTime(PARK_TIME) - if err != nil { - rater.Logger.Err("Error parsing answer event start time, using time.Now!") - startTime = time.Now() - } - // if there is no account configured leave the call alone - if strings.TrimSpace(ev.GetReqType()) != REQTYPE_PREPAID { - return - } - if ev.MissingParameter() { - sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), MISSING_PARAMETER) - rater.Logger.Err(fmt.Sprintf("Missing parameter for %s", ev.GetUUID())) - return - } - cd := rater.CallDescriptor{ - Direction: ev.GetDirection(), - Tenant: ev.GetTenant(), - TOR: ev.GetTOR(), - Subject: ev.GetSubject(), - Account: ev.GetAccount(), - Destination: ev.GetDestination(), - Amount: rsd.DebitPeriod.Seconds(), - TimeStart: startTime} - var remainingSeconds float64 - err = rsd.Connector.GetMaxSessionTime(cd, &remainingSeconds) - if err != nil { - rater.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err)) - sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), SYSTEM_ERROR) - return - } - rater.Logger.Info(fmt.Sprintf("Remaining seconds: %v", remainingSeconds)) - if remainingSeconds == 0 { - rater.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey())) - sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), INSUFFICIENT_FUNDS) - return - } - sm.UnparkCall(ev.GetUUID(), ev.GetCallDestNb(), AUTH_OK) -} - -func (rsd *SessionDelegate) OnChannelAnswer(ev Event, s *Session) { - rater.Logger.Info("freeswitch answer") -} - -func (rsd *SessionDelegate) OnChannelHangupComplete(ev Event, s *Session) { - if ev.GetReqType() == REQTYPE_POSTPAID { - startTime, err := ev.GetStartTime(START_TIME) - if err != nil { - rater.Logger.Crit("Error parsing postpaid call start time from event") - return - } - endTime, err := ev.GetEndTime() - if err != nil { - rater.Logger.Crit("Error parsing postpaid call start time from event") - return - } - cd := rater.CallDescriptor{ - Direction: ev.GetDirection(), - Tenant: ev.GetTenant(), - TOR: ev.GetTOR(), - Subject: ev.GetSubject(), - Account: ev.GetAccount(), - Destination: ev.GetDestination(), - TimeStart: startTime, - TimeEnd: endTime, - } - cc := &rater.CallCost{} - err = rsd.Connector.Debit(cd, cc) - if err != nil { - rater.Logger.Err(fmt.Sprintf("Error making the general debit for postpaid call: %v", ev.GetUUID())) - return - } - s.CallCosts = append(s.CallCosts, cc) - return - } - - if s == nil || len(s.CallCosts) == 0 { - return // why would we have 0 callcosts - } - lastCC := s.CallCosts[len(s.CallCosts)-1] - // put credit back - start := time.Now() - end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd - refoundDuration := end.Sub(start).Seconds() - cost := 0.0 - seconds := 0.0 - rater.Logger.Info(fmt.Sprintf("Refund duration: %v", refoundDuration)) - for i := len(lastCC.Timespans) - 1; i >= 0; i-- { - ts := lastCC.Timespans[i] - tsDuration := ts.GetDuration().Seconds() - if refoundDuration <= tsDuration { - // find procentage - procentage := (refoundDuration * 100) / tsDuration - tmpCost := (procentage * ts.Cost) / 100 - ts.Cost -= tmpCost - cost += tmpCost - if ts.MinuteInfo != nil { - // DestinationPrefix and Price take from lastCC and above caclulus - seconds += (procentage * ts.MinuteInfo.Quantity) / 100 - } - // set the end time to now - ts.TimeEnd = start - break // do not go to other timespans - } else { - cost += ts.Cost - if ts.MinuteInfo != nil { - seconds += ts.MinuteInfo.Quantity - } - // remove the timestamp entirely - lastCC.Timespans = lastCC.Timespans[:i] - // continue to the next timespan with what is left to refound - refoundDuration -= tsDuration - } - } - if cost > 0 { - cd := &rater.CallDescriptor{ - Direction: lastCC.Direction, - Tenant: lastCC.Tenant, - TOR: lastCC.TOR, - Subject: lastCC.Subject, - Account: lastCC.Account, - Destination: lastCC.Destination, - Amount: -cost, - } - var response float64 - err := rsd.Connector.DebitCents(*cd, &response) - if err != nil { - rater.Logger.Err(fmt.Sprintf("Debit cents failed: %v", err)) - } - } - if seconds > 0 { - cd := &rater.CallDescriptor{ - Direction: lastCC.Direction, - TOR: lastCC.TOR, - Tenant: lastCC.Tenant, - Subject: lastCC.Subject, - Account: lastCC.Account, - Destination: lastCC.Destination, - Amount: -seconds, - } - var response float64 - err := rsd.Connector.DebitSeconds(*cd, &response) - if err != nil { - rater.Logger.Err(fmt.Sprintf("Debit seconds failed: %v", err)) - } - } - lastCC.Cost -= cost - rater.Logger.Info(fmt.Sprintf("Rambursed %v cents, %v seconds", cost, seconds)) -} - -func (rsd *SessionDelegate) LoopAction(s *Session, cd *rater.CallDescriptor) { - cc := &rater.CallCost{} - cd.Amount = rsd.DebitPeriod.Seconds() - err := rsd.Connector.MaxDebit(*cd, cc) - if err != nil { - rater.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err)) - // disconnect session - s.sessionManager.DisconnectSession(s, SYSTEM_ERROR) - } - nbts := len(cc.Timespans) - remainingSeconds := 0.0 - rater.Logger.Debug(fmt.Sprintf("Result of MaxDebit call: %v", cc)) - if nbts > 0 { - remainingSeconds = cc.Timespans[nbts-1].TimeEnd.Sub(cc.Timespans[0].TimeStart).Seconds() - } - if remainingSeconds == 0 || err != nil { - rater.Logger.Info(fmt.Sprintf("No credit left: Disconnect %v", s)) - s.Disconnect() - return - } - s.CallCosts = append(s.CallCosts, cc) -} -func (rsd *SessionDelegate) GetDebitPeriod() time.Duration { - return rsd.DebitPeriod -} diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index 8b339388f..0f360243f 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -20,11 +20,12 @@ package sessionmanager import ( "github.com/cgrates/cgrates/rater" + "time" ) type SessionManager interface { DisconnectSession(*Session, string) - UnparkCall(string, string, string) - GetSessionDelegate() *SessionDelegate + LoopAction(*Session, *rater.CallDescriptor) + GetDebitPeriod() time.Duration GetDbLogger() rater.DataStorage }