From cb2ab3224b24e9b715324310a8b48768b1ddea4a Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 6 Mar 2015 17:30:12 +0100 Subject: [PATCH] Refactored FreeSWITCH SessionManager to make use of multiple connections, give up sharing of configuration at package level, make better use of interfaces to communicate with Sessions --- cmd/cgr-engine/cgr-engine.go | 3 +- sessionmanager/fsevent_test.go | 10 +- sessionmanager/fssessionmanager.go | 240 +++++++++++------------------ sessionmanager/kamailiosm.go | 18 ++- sessionmanager/osipsevent_test.go | 2 +- sessionmanager/osipssm.go | 29 ++-- sessionmanager/session.go | 25 ++- sessionmanager/sessionmanager.go | 13 +- 8 files changed, 147 insertions(+), 193 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index e6cff590c..0bd27d1ef 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -188,8 +188,7 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage } switch cfg.SMSwitchType { case FS: - dp, _ := time.ParseDuration(fmt.Sprintf("%vs", cfg.SMDebitInterval)) - sm = sessionmanager.NewFSSessionManager(cfg, loggerDb, raterConn, cdrsConn, dp) + sm = sessionmanager.NewFSSessionManager(cfg.SmFsConfig, loggerDb, raterConn, cdrsConn) case KAMAILIO: var debitInterval time.Duration if debitInterval, err = utils.ParseDurationWithSecs(strconv.Itoa(cfg.SMDebitInterval)); err != nil { diff --git a/sessionmanager/fsevent_test.go b/sessionmanager/fsevent_test.go index c52201b10..1db022952 100644 --- a/sessionmanager/fsevent_test.go +++ b/sessionmanager/fsevent_test.go @@ -417,7 +417,7 @@ Task-ID: 2 Task-Desc: heartbeat Task-Group: core Task-Runtime: 1349437318` - cfg, _ = config.NewDefaultCGRConfig() + cfg, _ := config.NewDefaultCGRConfig() config.SetCgrConfig(cfg) ev := new(FSEvent).AsEvent(body) setupTime, _ := ev.GetSetupTime("Event-Date-Local") @@ -471,7 +471,7 @@ Task-Runtime: 1349437318` } func TestParseFsHangup(t *testing.T) { - cfg, _ = config.NewDefaultCGRConfig() + cfg, _ := config.NewDefaultCGRConfig() config.SetCgrConfig(cfg) ev := new(FSEvent).AsEvent(hangupEv) setupTime, _ := ev.GetSetupTime(utils.META_DEFAULT) @@ -502,7 +502,7 @@ func TestParseFsHangup(t *testing.T) { } func TestParseEventValue(t *testing.T) { - cfg, _ = config.NewDefaultCGRConfig() + cfg, _ := config.NewDefaultCGRConfig() config.SetCgrConfig(cfg) ev := new(FSEvent).AsEvent(hangupEv) if cgrid := ev.ParseEventValue(&utils.RSRField{Id: utils.CGRID}); cgrid != "873e5bf7903978f305f7d8fed3f92f968cf82873" { @@ -613,7 +613,7 @@ Caller-Username: 04021292812` } func TestFsEvAsStoredCdr(t *testing.T) { - cfg, _ = config.NewDefaultCGRConfig() + cfg, _ := config.NewDefaultCGRConfig() config.SetCgrConfig(cfg) ev := new(FSEvent).AsEvent(hangupEv) setupTime, _ := utils.ParseTimeDetectLayout("1398442107") @@ -629,7 +629,7 @@ func TestFsEvAsStoredCdr(t *testing.T) { } func TestFsEvGetExtraFields(t *testing.T) { - cfg, _ = config.NewDefaultCGRConfig() + cfg, _ := config.NewDefaultCGRConfig() cfg.FSCdrExtraFields = []*utils.RSRField{&utils.RSRField{Id: "Channel-Read-Codec-Name"}, &utils.RSRField{Id: "Channel-Write-Codec-Name"}, &utils.RSRField{Id: "NonExistingHeader"}} config.SetCgrConfig(cfg) ev := new(FSEvent).AsEvent(hangupEv) diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index b135ebc41..7eddcfdac 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -19,11 +19,9 @@ along with this program. If not, see package sessionmanager import ( - "bufio" "errors" "fmt" "log/syslog" - "net" "time" "github.com/cgrates/cgrates/config" @@ -32,62 +30,63 @@ import ( "github.com/cgrates/fsock" ) -var cfg *config.CGRConfig // Share the configuration with the rest of the package - // The freeswitch session manager type holding a buffer for the network connection // and the active sessions type FSSessionManager struct { - conn net.Conn - buf *bufio.Reader - sessions []*Session - rater engine.Connector - cdrs engine.Connector - debitPeriod time.Duration - loggerDB engine.LogStorage + cfg *config.SmFsConfig + conns map[string]*fsock.FSock // Keep the list here for connection management purposes + sessions []*Session + rater engine.Connector + cdrs engine.Connector + loggerDB engine.LogStorage } -func NewFSSessionManager(cgrCfg *config.CGRConfig, storage engine.LogStorage, rater, cdrs engine.Connector, debitPeriod time.Duration) *FSSessionManager { - cfg = cgrCfg // make config global - return &FSSessionManager{loggerDB: storage, rater: rater, cdrs: cdrs, debitPeriod: debitPeriod} +func NewFSSessionManager(smFsConfig *config.SmFsConfig, storage engine.LogStorage, rater, cdrs engine.Connector) *FSSessionManager { + return &FSSessionManager{cfg: smFsConfig, conns: make(map[string]*fsock.FSock), loggerDB: storage, rater: rater, cdrs: cdrs} } // Connects to the freeswitch mod_event_socket server and starts // listening for events. -func (sm *FSSessionManager) Connect() (err error) { +func (sm *FSSessionManager) Connect() error { eventFilters := map[string]string{"Call-Direction": "inbound"} - if fsock.FS, err = fsock.NewFSock(cfg.FreeswitchServer, cfg.FreeswitchPass, cfg.FreeswitchReconnects, sm.createHandlers(), eventFilters, engine.Logger.(*syslog.Writer)); err != nil { - return err - } else if !fsock.FS.Connected() { - return errors.New("Cannot connect to FreeSWITCH") + errChan := make(chan error) + for _, connCfg := range sm.cfg.Connections { + connId := utils.GenUUID() + fSock, err := fsock.NewFSock(connCfg.Server, connCfg.Password, connCfg.Reconnects, sm.createHandlers(), eventFilters, engine.Logger.(*syslog.Writer), connId) + if err != nil { + errChan <- err + } else if !fSock.Connected() { + errChan <- errors.New("Could not connect to FreeSWITCH") + } else { + sm.conns[connId] = fSock + } + go func() { // Start reading in own goroutine, return on error + if err := fsock.FS.ReadEvents(); err != nil { + errChan <- err + } + }() } - if err := fsock.FS.ReadEvents(); err != nil { - return err - } - return errors.New(" - Stopped reading events") + err := <-errChan // Will keep the Connect locked until the first error in one of the connections + return err } -func (sm *FSSessionManager) createHandlers() (handlers map[string][]func(string)) { - hb := func(body string) { +func (sm *FSSessionManager) createHandlers() (handlers map[string][]func(string, string)) { + cp := func(body, connId string) { ev := new(FSEvent).AsEvent(body) - sm.OnHeartBeat(ev) + sm.onChannelPark(ev, connId) } - cp := func(body string) { + ca := func(body, connId string) { ev := new(FSEvent).AsEvent(body) - sm.OnChannelPark(ev) + sm.onChannelAnswer(ev, connId) } - ca := func(body string) { + ch := func(body, connId string) { ev := new(FSEvent).AsEvent(body) - sm.OnChannelAnswer(ev) + sm.onChannelHangupComplete(ev) } - ch := func(body string) { - ev := new(FSEvent).AsEvent(body) - sm.OnChannelHangupComplete(ev) - } - return map[string][]func(string){ - "HEARTBEAT": []func(string){hb}, - "CHANNEL_PARK": []func(string){cp}, - "CHANNEL_ANSWER": []func(string){ca}, - "CHANNEL_HANGUP_COMPLETE": []func(string){ch}, + return map[string][]func(string, string){ + "CHANNEL_PARK": []func(string, string){cp}, + "CHANNEL_ANSWER": []func(string, string){ca}, + "CHANNEL_HANGUP_COMPLETE": []func(string, string){ch}, } } @@ -102,25 +101,25 @@ func (sm *FSSessionManager) GetSession(uuid string) *Session { } // Disconnects a session by sending hangup command to freeswitch -func (sm *FSSessionManager) DisconnectSession(ev utils.Event, notify string) { - if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), notify)); err != nil { - engine.Logger.Err(fmt.Sprintf(" Could not send disconect api notification to freeswitch: %s", err.Error())) +func (sm *FSSessionManager) DisconnectSession(ev utils.Event, connId, notify string) { + if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), notify)); err != nil { + engine.Logger.Err(fmt.Sprintf(" Could not send disconect api notification to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) } if notify == INSUFFICIENT_FUNDS { - if len(cfg.FSEmptyBalanceContext) != 0 { - if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_transfer %s %s %s\n\n", ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), cfg.FSEmptyBalanceContext)); err != nil { - engine.Logger.Err(" Could not transfer the call to empty balance context") + if len(sm.cfg.EmptyBalanceContext) != 0 { + if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s %s\n\n", ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), sm.cfg.EmptyBalanceContext)); err != nil { + engine.Logger.Err(fmt.Sprintf(" Could not transfer the call to empty balance context, error: <%s>, connId: %s", err.Error(), connId)) } return - } else if len(cfg.FSEmptyBalanceAnnFile) != 0 { - if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", ev.GetUUID(), cfg.FSEmptyBalanceAnnFile)); err != nil { - engine.Logger.Err(fmt.Sprintf(" Could not send uuid_broadcast to freeswitch: %s", err.Error())) + } else if len(sm.cfg.EmptyBalanceAnnFile) != 0 { + if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", ev.GetUUID(), sm.cfg.EmptyBalanceAnnFile)); err != nil { + engine.Logger.Err(fmt.Sprintf(" Could not send uuid_broadcast to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) } return } } - if err := fsock.FS.SendMsgCmd(ev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil { - engine.Logger.Err(fmt.Sprintf(" Could not send disconect msg to freeswitch: %v", err)) + if err := sm.conns[connId].SendMsgCmd(ev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil { + engine.Logger.Err(fmt.Sprintf(" Could not send disconect msg to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) } return } @@ -136,112 +135,54 @@ func (sm *FSSessionManager) RemoveSession(uuid string) { } // Sets the call timeout valid of starting of the call -func (sm *FSSessionManager) setMaxCallDuration(uuid string, maxDur time.Duration) error { +func (sm *FSSessionManager) setMaxCallDuration(uuid, connId string, maxDur time.Duration) error { // _, err := fsock.FS.SendApiCmd(fmt.Sprintf("sched_hangup +%d %s\n\n", int(maxDur.Seconds()), uuid)) - _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s execute_on_answer sched_hangup +%d alloted_timeout\n\n", uuid, int(maxDur.Seconds()))) + _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s execute_on_answer sched_hangup +%d alloted_timeout\n\n", uuid, int(maxDur.Seconds()))) if err != nil { - engine.Logger.Err("could not send sched_hangup command to freeswitch") + engine.Logger.Err(fmt.Sprintf(" Could not send sched_hangup command to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) return err } return nil } // Sends the transfer command to unpark the call to freeswitch -func (sm *FSSessionManager) unparkCall(uuid, call_dest_nb, notify string) { - _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify)) +func (sm *FSSessionManager) unparkCall(uuid, connId, call_dest_nb, notify string) { + _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify)) if err != nil { - engine.Logger.Err(" Could not send unpark api notification to freeswitch") + engine.Logger.Err(fmt.Sprintf(" Could not send unpark api notification to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) } - if _, err = fsock.FS.SendApiCmd(fmt.Sprintf("uuid_transfer %s %s\n\n", uuid, call_dest_nb)); err != nil { - engine.Logger.Err(" Could not send unpark api call to freeswitch") + if _, err = sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s\n\n", uuid, call_dest_nb)); err != nil { + engine.Logger.Err(fmt.Sprintf(" Could not send unpark api call to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) } } -func (sm *FSSessionManager) OnHeartBeat(ev utils.Event) { - engine.Logger.Info("freeswitch ♥") -} - -func (sm *FSSessionManager) OnChannelPark(ev utils.Event) { - var maxCallDuration time.Duration // This will be the maximum duration this channel will be allowed to last - var durInitialized bool - attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT), - Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)} - var dcs utils.DerivedChargers - if err := sm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil { - engine.Logger.Err(fmt.Sprintf(" OnPark: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error())) - sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) // We unpark on original destination - return +func (sm *FSSessionManager) onChannelPark(ev utils.Event, connId string) { + var maxCallDuration float64 // This will be the maximum duration this channel will be allowed to last + if err := sm.rater.GetDerivedMaxSessionTime(ev, &maxCallDuration); err != nil { + engine.Logger.Err(fmt.Sprintf(" Could not get max session time for %s, error: %s", ev.GetUUID(), err.Error())) } - dcs, _ = dcs.AppendDefaultRun() - for _, dc := range dcs { - runFilters, _ := utils.ParseRSRFields(dc.RunFilters, utils.INFIELD_SEP) - matchingAllFilters := true - for _, dcRunFilter := range runFilters { - if fltrPass, _ := ev.PassesFieldFilter(dcRunFilter); !fltrPass { - matchingAllFilters = false - break - } - } - if !matchingAllFilters { // Do not process the derived charger further if not all filters were matched - continue - } - startTime, err := ev.GetAnswerTime(PARK_TIME) - if err != nil { - engine.Logger.Err("Error parsing answer event start time, using time.Now!") - startTime = time.Now() - } - if ev.MissingParameter() { - sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(dc.DestinationField), MISSING_PARAMETER) - engine.Logger.Err(fmt.Sprintf("Missing parameter for %s", ev.GetUUID())) - return - } - cd := engine.CallDescriptor{ - Direction: ev.GetDirection(dc.DirectionField), - Tenant: ev.GetTenant(dc.TenantField), - Category: ev.GetCategory(dc.CategoryField), - Subject: ev.GetSubject(dc.SubjectField), - Account: ev.GetAccount(dc.AccountField), - Destination: ev.GetDestination(dc.DestinationField), - TimeStart: startTime, - TimeEnd: startTime.Add(cfg.SMMaxCallDuration), - } - var remainingDurationFloat float64 - err = sm.rater.GetMaxSessionTime(cd, &remainingDurationFloat) - if err != nil { - engine.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err)) - sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(""), SYSTEM_ERROR) // We unpark on original destination - return - } - remainingDuration := time.Duration(remainingDurationFloat) - // Set maxCallDuration, smallest out of all forked sessions - if !durInitialized { // first time we set it /not initialized yet - maxCallDuration = remainingDuration - durInitialized = true - } else if maxCallDuration > remainingDuration { - maxCallDuration = remainingDuration - } - } - if maxCallDuration <= cfg.SMMinCallDuration { + maxCallDur := time.Duration(maxCallDuration) + if maxCallDur <= sm.cfg.MinCallDuration { //engine.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey(cd.Subject))) - sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), INSUFFICIENT_FUNDS) + sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), INSUFFICIENT_FUNDS) return } - sm.setMaxCallDuration(ev.GetUUID(), maxCallDuration) - sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), AUTH_OK) + sm.setMaxCallDuration(ev.GetUUID(), connId, maxCallDur) + sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), AUTH_OK) } -func (sm *FSSessionManager) OnChannelAnswer(ev utils.Event) { +func (sm *FSSessionManager) onChannelAnswer(ev utils.Event, connId string) { if ev.MissingParameter() { - sm.DisconnectSession(ev, MISSING_PARAMETER) + sm.DisconnectSession(ev, connId, MISSING_PARAMETER) } - s := NewSession(ev, sm) + s := NewSession(ev, connId, sm) if s != nil { sm.sessions = append(sm.sessions, s) } } -func (sm *FSSessionManager) OnChannelHangupComplete(ev utils.Event) { - go sm.processCdr(ev.AsStoredCdr()) +func (sm *FSSessionManager) onChannelHangupComplete(ev utils.Event) { + go sm.ProcessCdr(ev.AsStoredCdr()) s := sm.GetSession(ev.GetUUID()) if s == nil { // Not handled by us return @@ -252,7 +193,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev utils.Event) { } } -func (sm *FSSessionManager) processCdr(storedCdr *utils.StoredCdr) error { +func (sm *FSSessionManager) ProcessCdr(storedCdr *utils.StoredCdr) error { if sm.cdrs != nil { var reply string if err := sm.cdrs.ProcessCdr(storedCdr, &reply); err != nil { @@ -263,15 +204,10 @@ func (sm *FSSessionManager) processCdr(storedCdr *utils.StoredCdr) error { return nil } -func (sm *FSSessionManager) GetDebitPeriod() time.Duration { - return sm.debitPeriod +func (sm *FSSessionManager) DebitInterval() time.Duration { + return sm.cfg.DebitInterval } - -func (sm *FSSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engine.CallCost) error { - return sm.rater.MaxDebit(*cd, cc) -} - -func (sm *FSSessionManager) GetDbLogger() engine.LogStorage { +func (sm *FSSessionManager) DbLogger() engine.LogStorage { return sm.loggerDB } @@ -279,17 +215,27 @@ func (sm *FSSessionManager) Rater() engine.Connector { return sm.rater } -func (sm *FSSessionManager) Shutdown() (err error) { - if fsock.FS == nil || !fsock.FS.Connected() { - return errors.New("Cannot shutdown sessions, fsock not connected") +// Called when call goes under the minimum duratio threshold, so FreeSWITCH can play an announcement message +func (sm *FSSessionManager) WarnSessionMinDuration(sessionUuid, connId string) { + if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s %s aleg\n\n", sessionUuid, sm.cfg.LowBalanceAnnFile)); err != nil { + engine.Logger.Err(fmt.Sprintf(" Could not send uuid_broadcast to freeswitch, error: %s, connection id: %s", err.Error(), connId)) } - engine.Logger.Info("Shutting down all sessions...") - if _, err = fsock.FS.SendApiCmd("hupall MANAGER_REQUEST cgr_reqtype prepaid"); err != nil { - engine.Logger.Err(fmt.Sprintf("Error on calls shutdown: %s", err)) +} + +func (sm *FSSessionManager) Shutdown() (err error) { + for connId, fSock := range sm.conns { + if !fSock.Connected() { + engine.Logger.Err(fmt.Sprintf(" Cannot shutdown sessions, fsock not connected for connection id: %s", connId)) + continue + } + engine.Logger.Info(fmt.Sprintf(" Shutting down all sessions on connection id: %s", connId)) + if _, err = fSock.SendApiCmd("hupall MANAGER_REQUEST cgr_reqtype prepaid"); err != nil { + engine.Logger.Err(fmt.Sprintf(" Error on calls shutdown: %s, connection id: %s", err.Error(), connId)) + } } for guard := 0; len(sm.sessions) > 0 && guard < 20; guard++ { time.Sleep(100 * time.Millisecond) // wait for the hungup event to be fired - engine.Logger.Info(fmt.Sprintf(" Shutdown waiting on sessions: %v", sm.sessions)) + engine.Logger.Info(fmt.Sprintf(" Shutdown waiting on sessions: %v", sm.sessions)) } - return + return nil } diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go index 301204059..14c91841c 100644 --- a/sessionmanager/kamailiosm.go +++ b/sessionmanager/kamailiosm.go @@ -75,10 +75,10 @@ func (self *KamailioSessionManager) onCallStart(evData []byte) { engine.Logger.Err(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", evData, err.Error())) } if kamEv.MissingParameter() { - self.DisconnectSession(kamEv, utils.ERR_MANDATORY_IE_MISSING) + self.DisconnectSession(kamEv, "", utils.ERR_MANDATORY_IE_MISSING) return } - s := NewSession(kamEv, self) + s := NewSession(kamEv, "", self) if s != nil { self.sessions = append(self.sessions, s) } @@ -119,7 +119,7 @@ func (self *KamailioSessionManager) Connect() error { return errors.New(" Stopped reading events") } -func (self *KamailioSessionManager) DisconnectSession(ev utils.Event, notify string) { +func (self *KamailioSessionManager) DisconnectSession(ev utils.Event, connId, notify string) { sessionIds := ev.GetSessionIds() disconnectEv := &KamSessionDisconnect{Event: CGR_SESSION_DISCONNECT, HashEntry: sessionIds[0], HashId: sessionIds[1], Reason: notify} if err := self.kea.Send(disconnectEv.String()); err != nil { @@ -149,24 +149,28 @@ func (self *KamailioSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engi return self.rater.MaxDebit(*cd, cc) } -func (self *KamailioSessionManager) GetDebitPeriod() time.Duration { +func (self *KamailioSessionManager) DebitInterval() time.Duration { return self.debitInterval } -func (self *KamailioSessionManager) GetDbLogger() engine.LogStorage { +func (self *KamailioSessionManager) DbLogger() engine.LogStorage { return self.loggerDb } func (self *KamailioSessionManager) Rater() engine.Connector { return self.rater } -func (self *KamailioSessionManager) ProcessCdr(cdr *utils.StoredCdr) { +func (self *KamailioSessionManager) ProcessCdr(cdr *utils.StoredCdr) error { if self.cdrsrv == nil { - return + return nil } var reply string if err := self.cdrsrv.ProcessCdr(cdr, &reply); err != nil { engine.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", cdr.CgrId, cdr.AccId, err.Error())) } + return nil +} + +func (sm *KamailioSessionManager) WarnSessionMinDuration(sessionUuid, connId string) { } func (self *KamailioSessionManager) Shutdown() error { return nil diff --git a/sessionmanager/osipsevent_test.go b/sessionmanager/osipsevent_test.go index a6a5063e9..63e07eec0 100644 --- a/sessionmanager/osipsevent_test.go +++ b/sessionmanager/osipsevent_test.go @@ -68,7 +68,7 @@ func TestOsipsEventParseStatic(t *testing.T) { } func TestOsipsEventGetValues(t *testing.T) { - cfg, _ = config.NewDefaultCGRConfig() + cfg, _ := config.NewDefaultCGRConfig() config.SetCgrConfig(cfg) setupTime, _ := osipsEv.GetSetupTime(utils.META_DEFAULT) eSetupTime, _ := utils.ParseTimeDetectLayout("1406370492") diff --git a/sessionmanager/osipssm.go b/sessionmanager/osipssm.go index bfed586c0..7550f558d 100644 --- a/sessionmanager/osipssm.go +++ b/sessionmanager/osipssm.go @@ -35,7 +35,7 @@ func NewOSipsSessionManager(cfg *config.CGRConfig, rater, cdrsrv engine.Connecto osm := &OsipsSessionManager{cgrCfg: cfg, rater: rater, cdrsrv: cdrsrv} osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){ "E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.OnOpensipsStart}, - "E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.OnCdr}, + "E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.onCdr}, "E_CGR_AUTHORIZE": []func(*osipsdagram.OsipsEvent){osm.OnAuthorize}, } return osm, nil @@ -70,7 +70,7 @@ func (osm *OsipsSessionManager) Connect() (err error) { return errors.New(" Stopped reading events") } -func (osm *OsipsSessionManager) DisconnectSession(ev utils.Event, notify string) { +func (osm *OsipsSessionManager) DisconnectSession(ev utils.Event, cgrId, notify string) { return } func (osm *OsipsSessionManager) RemoveSession(uuid string) { @@ -79,15 +79,18 @@ func (osm *OsipsSessionManager) RemoveSession(uuid string) { func (osm *OsipsSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engine.CallCost) error { return nil } -func (osm *OsipsSessionManager) GetDebitPeriod() time.Duration { +func (osm *OsipsSessionManager) DebitInterval() time.Duration { var nilDuration time.Duration return nilDuration } -func (osm *OsipsSessionManager) GetDbLogger() engine.LogStorage { +func (osm *OsipsSessionManager) DbLogger() engine.LogStorage { return nil } -func (self *OsipsSessionManager) Rater() engine.Connector { - return self.rater +func (osm *OsipsSessionManager) Rater() engine.Connector { + return osm.rater +} +func (osm *OsipsSessionManager) WarnSessionMinDuration(sessionUuid, connId string) { + return } func (osm *OsipsSessionManager) Shutdown() error { return nil @@ -143,13 +146,17 @@ func (osm *OsipsSessionManager) OnOpensipsStart(cdrDagram *osipsdagram.OsipsEven go osm.SubscribeEvents(evStop) } -func (osm *OsipsSessionManager) OnCdr(cdrDagram *osipsdagram.OsipsEvent) { - var reply string +func (osm *OsipsSessionManager) onCdr(cdrDagram *osipsdagram.OsipsEvent) { osipsEv, _ := NewOsipsEvent(cdrDagram) - storedCdr := osipsEv.AsStoredCdr() - if err := osm.cdrsrv.ProcessCdr(storedCdr, &reply); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", storedCdr.CgrId, storedCdr.AccId, err.Error())) + if err := osm.ProcessCdr(osipsEv.AsStoredCdr()); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", osipsEv.GetCgrId(), osipsEv.GetUUID(), err.Error())) } + +} + +func (osm *OsipsSessionManager) ProcessCdr(storedCdr *utils.StoredCdr) error { + var reply string + return osm.cdrsrv.ProcessCdr(storedCdr, &reply) } // Process Authorize request from OpenSIPS and communicate back maxdur diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 10981c30d..12f782c0c 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -25,7 +25,6 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/fsock" ) // Session type holding the call information fields, a session delegate for specific @@ -34,6 +33,8 @@ type Session struct { eventStart utils.Event // Store the original event who started this session so we can use it's info later (eg: disconnect, cgrid) stopDebit chan bool // Channel to communicate with debit loops when closing the session sessionManager SessionManager + connId string // Reference towards connection id on the session manager side. + warnMinDur time.Duration sessionRuns []*engine.SessionRun } @@ -51,10 +52,11 @@ func (s *Session) SessionRuns() []*engine.SessionRun { } // Creates a new session and in case of prepaid starts the debit loop for each of the session runs individually -func NewSession(ev utils.Event, sm SessionManager) *Session { +func NewSession(ev utils.Event, connId string, sm SessionManager) *Session { s := &Session{eventStart: ev, stopDebit: make(chan bool), sessionManager: sm, + connId: connId, } if err := sm.Rater().GetSessionRuns(ev, &s.sessionRuns); err != nil || len(s.sessionRuns) == 0 { return nil @@ -69,7 +71,7 @@ func NewSession(ev utils.Event, sm SessionManager) *Session { func (s *Session) debitLoop(runIdx int) { nextCd := *s.sessionRuns[runIdx].CallDescriptor index := 0.0 - debitPeriod := s.sessionManager.GetDebitPeriod() + debitPeriod := s.sessionManager.DebitInterval() for { select { case <-s.stopDebit: @@ -83,19 +85,17 @@ func (s *Session) debitLoop(runIdx int) { nextCd.LoopIndex = index nextCd.DurationIndex += debitPeriod // first presumed duration cc := new(engine.CallCost) - if err := s.sessionManager.MaxDebit(&nextCd, cc); err != nil { + if err := s.sessionManager.Rater().MaxDebit(nextCd, cc); err != nil { engine.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err)) - s.sessionManager.DisconnectSession(s.eventStart, SYSTEM_ERROR) + s.sessionManager.DisconnectSession(s.eventStart, s.connId, SYSTEM_ERROR) return } if cc.GetDuration() == 0 { - s.sessionManager.DisconnectSession(s.eventStart, INSUFFICIENT_FUNDS) + s.sessionManager.DisconnectSession(s.eventStart, s.connId, INSUFFICIENT_FUNDS) return } - if cc.GetDuration() <= cfg.FSMinDurLowBalance && len(cfg.FSLowBalanceAnnFile) != 0 { - if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_broadcast %s %s aleg\n\n", s.eventStart.GetUUID(), cfg.FSLowBalanceAnnFile)); err != nil { - engine.Logger.Err(fmt.Sprintf(" Could not send uuid_broadcast to freeswitch: %s", err.Error())) - } + if s.warnMinDur != time.Duration(0) && cc.GetDuration() <= s.warnMinDur { + s.sessionManager.WarnSessionMinDuration(s.eventStart.GetUUID(), s.connId) } s.sessionRuns[runIdx].CallCosts = append(s.sessionRuns[runIdx].CallCosts, cc) nextCd.TimeEnd = cc.GetEndTime() // set debited timeEnd @@ -204,9 +204,6 @@ func (s *Session) SaveOperations() { for _, cc := range sr.CallCosts[1:] { firstCC.Merge(cc) } - if s.sessionManager.GetDbLogger() == nil { - engine.Logger.Err(" Error: no connection to logger database, cannot save costs") - } - s.sessionManager.GetDbLogger().LogCallCost(s.eventStart.GetCgrId(), engine.SESSION_MANAGER_SOURCE, sr.DerivedCharger.RunId, firstCC) + s.sessionManager.DbLogger().LogCallCost(s.eventStart.GetCgrId(), engine.SESSION_MANAGER_SOURCE, sr.DerivedCharger.RunId, firstCC) } } diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index 34ad293db..95f9d9462 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -26,12 +26,13 @@ import ( ) type SessionManager interface { - Connect() error - DisconnectSession(utils.Event, string) - RemoveSession(string) - MaxDebit(*engine.CallDescriptor, *engine.CallCost) error - GetDebitPeriod() time.Duration - GetDbLogger() engine.LogStorage + DbLogger() engine.LogStorage Rater() engine.Connector + DebitInterval() time.Duration + Connect() error + DisconnectSession(utils.Event, string, string) + WarnSessionMinDuration(string, string) + RemoveSession(string) + ProcessCdr(*utils.StoredCdr) error Shutdown() error }