diff --git a/config/config_defaults.go b/config/config_defaults.go index a0caa38b5..9d1cc2d87 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -210,6 +210,7 @@ const CGRATES_CFG_JSON = ` "debit_interval": "0s", // interval to perform debits on. "min_call_duration": "0s", // only authorize calls with allowed duration higher than this "max_call_duration": "3h", // maximum call duration a prepaid call can last + "session_ttl": "0s", // time after a session with no updates is terminated }, diff --git a/config/config_json_test.go b/config/config_json_test.go index e80cb6aaa..b7bc3c0c8 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -338,6 +338,7 @@ func TestSmGenericJsonCfg(t *testing.T) { Debit_interval: utils.StringPointer("0s"), Min_call_duration: utils.StringPointer("0s"), Max_call_duration: utils.StringPointer("3h"), + Session_ttl: utils.StringPointer("0s"), } if cfg, err := dfCgrJsonCfg.SmGenericJsonCfg(); err != nil { t.Error(err) diff --git a/config/libconfig_json.go b/config/libconfig_json.go index e40dedcdd..2dca787a4 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -174,6 +174,7 @@ type SmGenericJsonCfg struct { Debit_interval *string Min_call_duration *string Max_call_duration *string + Session_ttl *string } // SM-FreeSWITCH config section diff --git a/config/smconfig.go b/config/smconfig.go index 50e503bce..e828c76a0 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -79,6 +79,7 @@ type SmGenericConfig struct { DebitInterval time.Duration MinCallDuration time.Duration MaxCallDuration time.Duration + SessionTTL time.Duration } func (self *SmGenericConfig) loadFromJsonCfg(jsnCfg *SmGenericJsonCfg) error { @@ -113,6 +114,11 @@ func (self *SmGenericConfig) loadFromJsonCfg(jsnCfg *SmGenericJsonCfg) error { return err } } + if jsnCfg.Session_ttl != nil { + if self.SessionTTL, err = utils.ParseDurationWithSecs(*jsnCfg.Session_ttl); err != nil { + return err + } + } return nil } diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 4e7790c06..87cc9a860 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -187,6 +187,7 @@ // "debit_interval": "0s", // interval to perform debits on. // "min_call_duration": "0s", // only authorize calls with allowed duration higher than this // "max_call_duration": "3h", // maximum call duration a prepaid call can last +// "session_ttl": "0s", // time after a session with no updates is terminated //}, diff --git a/data/conf/samples/smg/cgrates.json b/data/conf/samples/smg/cgrates.json index 0cbfc28c2..d6aaf2a6e 100644 --- a/data/conf/samples/smg/cgrates.json +++ b/data/conf/samples/smg/cgrates.json @@ -27,6 +27,7 @@ "enabled": true, "rater": "internal", "cdrs": "internal", + "session_ttl": "10ms", }, } diff --git a/sessionmanager/data_it_test.go b/sessionmanager/data_it_test.go index 08b701df8..75e927719 100644 --- a/sessionmanager/data_it_test.go +++ b/sessionmanager/data_it_test.go @@ -384,3 +384,52 @@ func TestSMGDataDerivedChargingNoCredit(t *testing.T) { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue()) } } + +func TestSMGDataTTLExpired(t *testing.T) { + if !*testIntegration { + return + } + var acnt *engine.Account + attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1010"} + eAcntVal := 49999897600.000000 + if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { + t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) + } + smgEv := SMGenericEvent{ + utils.EVENT_NAME: "TEST_EVENT", + utils.TOR: utils.DATA, + utils.ACCID: "12349", + utils.DIRECTION: utils.OUT, + utils.ACCOUNT: "1010", + utils.SUBJECT: "1010", + utils.DESTINATION: "222", + utils.CATEGORY: "data", + utils.TENANT: "cgrates.org", + utils.REQTYPE: utils.META_PREPAID, + utils.SETUP_TIME: "2016-01-05 18:30:49", + utils.ANSWER_TIME: "2016-01-05 18:31:05", + utils.USAGE: "1048576", + } + var maxUsage float64 + if err := smgRPC.Call("SMGenericV1.SessionStart", smgEv, &maxUsage); err != nil { + t.Error(err) + } + if maxUsage != 1.048576e+06 { + t.Error("Bad max usage: ", maxUsage) + } + eAcntVal = 49998842880.000000 + if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { + t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) + } + time.Sleep(50 * time.Millisecond) + eAcntVal = 49999897600.000000 + if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { + t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) + } +} diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index dd5e27fd5..1dcbe7508 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -35,24 +35,47 @@ var ErrPartiallyExecuted = errors.New("Partially executed") func NewSMGeneric(cgrCfg *config.CGRConfig, rater engine.Connector, cdrsrv engine.Connector, timezone string, extconns *SMGExternalConnections) *SMGeneric { gsm := &SMGeneric{cgrCfg: cgrCfg, rater: rater, cdrsrv: cdrsrv, extconns: extconns, timezone: timezone, - sessions: make(map[string][]*SMGSession), sessionsMux: new(sync.Mutex), guard: engine.Guardian} + sessions: make(map[string][]*SMGSession), sessionTerminators: make(map[string]*smgSessionTerminator), sessionsMux: new(sync.RWMutex), guard: engine.Guardian} return gsm } type SMGeneric struct { - cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple - rater engine.Connector - cdrsrv engine.Connector - timezone string - sessions map[string][]*SMGSession //Group sessions per sessionId, multiple runs based on derived charging - extconns *SMGExternalConnections // Reference towards external connections manager - sessionsMux *sync.Mutex // Locks sessions map - guard *engine.GuardianLock // Used to lock on uuid + cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple + rater engine.Connector + cdrsrv engine.Connector + timezone string + sessions map[string][]*SMGSession //Group sessions per sessionId, multiple runs based on derived charging + sessionTerminators map[string]*smgSessionTerminator // terminate and cleanup the session if timer expires + extconns *SMGExternalConnections // Reference towards external connections manager + sessionsMux *sync.RWMutex // Locks sessions map + guard *engine.GuardianLock // Used to lock on uuid +} +type smgSessionTerminator struct { + timer *time.Timer + endChan chan bool } func (self *SMGeneric) indexSession(uuid string, s *SMGSession) { self.sessionsMux.Lock() self.sessions[uuid] = append(self.sessions[uuid], s) + if self.cgrCfg.SmGenericConfig.SessionTTL > 0 { + if _, found := self.sessionTerminators[uuid]; !found { + timer := time.NewTimer(self.cgrCfg.SmGenericConfig.SessionTTL) + endChan := make(chan bool, 1) + go func() { + select { + case <-timer.C: + self.sessionEnd(uuid, 0) + case <-endChan: + timer.Stop() + } + }() + self.sessionTerminators[uuid] = &smgSessionTerminator{ + timer: timer, + endChan: endChan, + } + } + } self.sessionsMux.Unlock() } @@ -60,10 +83,14 @@ func (self *SMGeneric) indexSession(uuid string, s *SMGSession) { func (self *SMGeneric) unindexSession(uuid string) bool { self.sessionsMux.Lock() defer self.sessionsMux.Unlock() - if _, hasIt := self.sessions[uuid]; !hasIt { + if _, found := self.sessions[uuid]; !found { return false } delete(self.sessions, uuid) + if st, found := self.sessionTerminators[uuid]; found { + st.endChan <- true + delete(self.sessionTerminators, uuid) + } return true } @@ -88,11 +115,20 @@ func (self *SMGeneric) getSessionIDsForPrefix(prefix string) []string { // Returns sessions/derived for a specific uuid func (self *SMGeneric) getSession(uuid string) []*SMGSession { - self.sessionsMux.Lock() - defer self.sessionsMux.Unlock() + self.sessionsMux.RLock() + defer self.sessionsMux.RUnlock() return self.sessions[uuid] } +// Updates the timer for the session to a new ttl +func (self *SMGeneric) resetTerminatorTimer(uuid string) { + self.sessionsMux.RLock() + defer self.sessionsMux.RUnlock() + if st, found := self.sessionTerminators[uuid]; found { + st.timer.Reset(self.cgrCfg.SmGenericConfig.SessionTTL) + } +} + // Handle a new session, pass the connectionId so we can communicate on disconnect request func (self *SMGeneric) sessionStart(evStart SMGenericEvent, connId string) error { sessionId := evStart.GetUUID() @@ -221,6 +257,7 @@ func (self *SMGeneric) SessionStart(gev SMGenericEvent, clnt *rpc2.Client) (time // Execute debits for usage/maxUsage func (self *SMGeneric) SessionUpdate(gev SMGenericEvent, clnt *rpc2.Client) (time.Duration, error) { + self.resetTerminatorTimer(gev.GetUUID()) if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil { err := self.sessionRelocate(gev.GetUUID(), initialID) if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update