added session ttl to smgeneric

This commit is contained in:
Radu Ioan Fericean
2016-04-08 00:33:21 +03:00
parent b176511212
commit 155168406c
8 changed files with 109 additions and 12 deletions

View File

@@ -210,6 +210,7 @@ const CGRATES_CFG_JSON = `
"debit_interval": "0s", // interval to perform debits on.
"min_call_duration": "0s", // only authorize calls with allowed duration higher than this
"max_call_duration": "3h", // maximum call duration a prepaid call can last
"session_ttl": "0s", // time after a session with no updates is terminated
},

View File

@@ -338,6 +338,7 @@ func TestSmGenericJsonCfg(t *testing.T) {
Debit_interval: utils.StringPointer("0s"),
Min_call_duration: utils.StringPointer("0s"),
Max_call_duration: utils.StringPointer("3h"),
Session_ttl: utils.StringPointer("0s"),
}
if cfg, err := dfCgrJsonCfg.SmGenericJsonCfg(); err != nil {
t.Error(err)

View File

@@ -174,6 +174,7 @@ type SmGenericJsonCfg struct {
Debit_interval *string
Min_call_duration *string
Max_call_duration *string
Session_ttl *string
}
// SM-FreeSWITCH config section

View File

@@ -79,6 +79,7 @@ type SmGenericConfig struct {
DebitInterval time.Duration
MinCallDuration time.Duration
MaxCallDuration time.Duration
SessionTTL time.Duration
}
func (self *SmGenericConfig) loadFromJsonCfg(jsnCfg *SmGenericJsonCfg) error {
@@ -113,6 +114,11 @@ func (self *SmGenericConfig) loadFromJsonCfg(jsnCfg *SmGenericJsonCfg) error {
return err
}
}
if jsnCfg.Session_ttl != nil {
if self.SessionTTL, err = utils.ParseDurationWithSecs(*jsnCfg.Session_ttl); err != nil {
return err
}
}
return nil
}

View File

@@ -187,6 +187,7 @@
// "debit_interval": "0s", // interval to perform debits on.
// "min_call_duration": "0s", // only authorize calls with allowed duration higher than this
// "max_call_duration": "3h", // maximum call duration a prepaid call can last
// "session_ttl": "0s", // time after a session with no updates is terminated
//},

View File

@@ -27,6 +27,7 @@
"enabled": true,
"rater": "internal",
"cdrs": "internal",
"session_ttl": "10ms",
},
}

View File

@@ -384,3 +384,52 @@ func TestSMGDataDerivedChargingNoCredit(t *testing.T) {
t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue())
}
}
func TestSMGDataTTLExpired(t *testing.T) {
if !*testIntegration {
return
}
var acnt *engine.Account
attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1010"}
eAcntVal := 49999897600.000000
if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil {
t.Error(err)
} else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal {
t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue())
}
smgEv := SMGenericEvent{
utils.EVENT_NAME: "TEST_EVENT",
utils.TOR: utils.DATA,
utils.ACCID: "12349",
utils.DIRECTION: utils.OUT,
utils.ACCOUNT: "1010",
utils.SUBJECT: "1010",
utils.DESTINATION: "222",
utils.CATEGORY: "data",
utils.TENANT: "cgrates.org",
utils.REQTYPE: utils.META_PREPAID,
utils.SETUP_TIME: "2016-01-05 18:30:49",
utils.ANSWER_TIME: "2016-01-05 18:31:05",
utils.USAGE: "1048576",
}
var maxUsage float64
if err := smgRPC.Call("SMGenericV1.SessionStart", smgEv, &maxUsage); err != nil {
t.Error(err)
}
if maxUsage != 1.048576e+06 {
t.Error("Bad max usage: ", maxUsage)
}
eAcntVal = 49998842880.000000
if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil {
t.Error(err)
} else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal {
t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue())
}
time.Sleep(50 * time.Millisecond)
eAcntVal = 49999897600.000000
if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil {
t.Error(err)
} else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal {
t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue())
}
}

View File

@@ -35,24 +35,47 @@ var ErrPartiallyExecuted = errors.New("Partially executed")
func NewSMGeneric(cgrCfg *config.CGRConfig, rater engine.Connector, cdrsrv engine.Connector, timezone string, extconns *SMGExternalConnections) *SMGeneric {
gsm := &SMGeneric{cgrCfg: cgrCfg, rater: rater, cdrsrv: cdrsrv, extconns: extconns, timezone: timezone,
sessions: make(map[string][]*SMGSession), sessionsMux: new(sync.Mutex), guard: engine.Guardian}
sessions: make(map[string][]*SMGSession), sessionTerminators: make(map[string]*smgSessionTerminator), sessionsMux: new(sync.RWMutex), guard: engine.Guardian}
return gsm
}
type SMGeneric struct {
cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple
rater engine.Connector
cdrsrv engine.Connector
timezone string
sessions map[string][]*SMGSession //Group sessions per sessionId, multiple runs based on derived charging
extconns *SMGExternalConnections // Reference towards external connections manager
sessionsMux *sync.Mutex // Locks sessions map
guard *engine.GuardianLock // Used to lock on uuid
cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple
rater engine.Connector
cdrsrv engine.Connector
timezone string
sessions map[string][]*SMGSession //Group sessions per sessionId, multiple runs based on derived charging
sessionTerminators map[string]*smgSessionTerminator // terminate and cleanup the session if timer expires
extconns *SMGExternalConnections // Reference towards external connections manager
sessionsMux *sync.RWMutex // Locks sessions map
guard *engine.GuardianLock // Used to lock on uuid
}
type smgSessionTerminator struct {
timer *time.Timer
endChan chan bool
}
func (self *SMGeneric) indexSession(uuid string, s *SMGSession) {
self.sessionsMux.Lock()
self.sessions[uuid] = append(self.sessions[uuid], s)
if self.cgrCfg.SmGenericConfig.SessionTTL > 0 {
if _, found := self.sessionTerminators[uuid]; !found {
timer := time.NewTimer(self.cgrCfg.SmGenericConfig.SessionTTL)
endChan := make(chan bool, 1)
go func() {
select {
case <-timer.C:
self.sessionEnd(uuid, 0)
case <-endChan:
timer.Stop()
}
}()
self.sessionTerminators[uuid] = &smgSessionTerminator{
timer: timer,
endChan: endChan,
}
}
}
self.sessionsMux.Unlock()
}
@@ -60,10 +83,14 @@ func (self *SMGeneric) indexSession(uuid string, s *SMGSession) {
func (self *SMGeneric) unindexSession(uuid string) bool {
self.sessionsMux.Lock()
defer self.sessionsMux.Unlock()
if _, hasIt := self.sessions[uuid]; !hasIt {
if _, found := self.sessions[uuid]; !found {
return false
}
delete(self.sessions, uuid)
if st, found := self.sessionTerminators[uuid]; found {
st.endChan <- true
delete(self.sessionTerminators, uuid)
}
return true
}
@@ -88,11 +115,20 @@ func (self *SMGeneric) getSessionIDsForPrefix(prefix string) []string {
// Returns sessions/derived for a specific uuid
func (self *SMGeneric) getSession(uuid string) []*SMGSession {
self.sessionsMux.Lock()
defer self.sessionsMux.Unlock()
self.sessionsMux.RLock()
defer self.sessionsMux.RUnlock()
return self.sessions[uuid]
}
// Updates the timer for the session to a new ttl
func (self *SMGeneric) resetTerminatorTimer(uuid string) {
self.sessionsMux.RLock()
defer self.sessionsMux.RUnlock()
if st, found := self.sessionTerminators[uuid]; found {
st.timer.Reset(self.cgrCfg.SmGenericConfig.SessionTTL)
}
}
// Handle a new session, pass the connectionId so we can communicate on disconnect request
func (self *SMGeneric) sessionStart(evStart SMGenericEvent, connId string) error {
sessionId := evStart.GetUUID()
@@ -221,6 +257,7 @@ func (self *SMGeneric) SessionStart(gev SMGenericEvent, clnt *rpc2.Client) (time
// Execute debits for usage/maxUsage
func (self *SMGeneric) SessionUpdate(gev SMGenericEvent, clnt *rpc2.Client) (time.Duration, error) {
self.resetTerminatorTimer(gev.GetUUID())
if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil {
err := self.sessionRelocate(gev.GetUUID(), initialID)
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update