Merge pull request #1878 from Trial97/session_update

Added alterable_fields for sessions
This commit is contained in:
Dan Christian Bogos
2020-01-20 09:41:55 +01:00
committed by GitHub
15 changed files with 73 additions and 17 deletions

View File

@@ -418,7 +418,8 @@ const CGRATES_CFG_JSON = `
"session_indexes": [], // index sessions based on these fields for GetActiveSessions API
"client_protocol": 1.0, // version of protocol to use when acting as JSON-PRC client <"0","1.0">
"channel_sync_interval": "0", // sync channels to detect stale sessions (0 to disable)
"terminate_attempts": 5 // attempts to get the session before terminating it
"terminate_attempts": 5, // attempts to get the session before terminating it
"alterable_fields": [], // the session fields that can be updated
},

View File

@@ -394,6 +394,7 @@ func testCGRConfigReloadSessionS(t *testing.T) {
SessionIndexes: utils.NewStringMap(),
ClientProtocol: 1,
TerminateAttempts: 5,
AlterableFields: utils.NewStringSet([]string{}),
}
if !reflect.DeepEqual(expAttr, cfg.SessionSCfg()) {
t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.SessionSCfg()))
@@ -953,6 +954,7 @@ func testCGRConfigReloadConfigFromJSONSessionS(t *testing.T) {
SessionIndexes: utils.NewStringMap(),
ClientProtocol: 1,
TerminateAttempts: 5,
AlterableFields: utils.NewStringSet([]string{}),
}
if !reflect.DeepEqual(expAttr, cfg.SessionSCfg()) {
t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.SessionSCfg()))
@@ -993,6 +995,7 @@ func testCGRConfigReloadAll(t *testing.T) {
SessionIndexes: utils.NewStringMap(),
ClientProtocol: 1,
TerminateAttempts: 5,
AlterableFields: utils.NewStringSet([]string{}),
}
if !reflect.DeepEqual(expAttr, cfg.SessionSCfg()) {
t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.SessionSCfg()))

View File

@@ -648,6 +648,7 @@ func TestSmgJsonCfg(t *testing.T) {
Client_protocol: utils.Float64Pointer(1.0),
Channel_sync_interval: utils.StringPointer("0"),
Terminate_attempts: utils.IntPointer(5),
Alterable_fields: &[]string{},
}
if cfg, err := dfCgrJsonCfg.SessionSJsonCfg(); err != nil {
t.Error(err)

View File

@@ -646,6 +646,7 @@ func TestCgrCfgJSONDefaultsSMGenericCfg(t *testing.T) {
ClientProtocol: 1.0,
ChannelSyncInterval: 0,
TerminateAttempts: 5,
AlterableFields: utils.NewStringSet([]string{}),
}
if !reflect.DeepEqual(eSessionSCfg, cgrCfg.sessionSCfg) {
t.Errorf("expecting: %s, received: %s",

View File

@@ -215,6 +215,11 @@ func (cfg *CGRConfig) checkConfigSanity() error {
if cfg.cacheCfg[utils.CacheClosedSessions].Limit == 0 {
return fmt.Errorf("<%s> %s needs to be != 0, received: %d", utils.CacheS, utils.CacheClosedSessions, cfg.cacheCfg[utils.CacheClosedSessions].Limit)
}
for alfld := range cfg.sessionSCfg.AlterableFields.Data() {
if utils.ProtectedSFlds.Has(alfld) {
return fmt.Errorf("<%s> The following protected field can't be altered by session: <%s>", utils.SessionS, alfld)
}
}
}
// FreeSWITCHAgent checks

View File

@@ -235,6 +235,7 @@ type SessionSJsonCfg struct {
Client_protocol *float64
Channel_sync_interval *string
Terminate_attempts *int
Alterable_fields *[]string
}
// FreeSWITCHAgent config section

View File

@@ -87,6 +87,7 @@ type SessionSCfg struct {
ClientProtocol float64
ChannelSyncInterval time.Duration
TerminateAttempts int
AlterableFields *utils.StringSet
}
func (scfg *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) (err error) {
@@ -249,6 +250,9 @@ func (scfg *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) (err error) {
if jsnCfg.Terminate_attempts != nil {
scfg.TerminateAttempts = *jsnCfg.Terminate_attempts
}
if jsnCfg.Alterable_fields != nil {
scfg.AlterableFields = utils.NewStringSet(*jsnCfg.Alterable_fields)
}
return nil
}

View File

@@ -59,6 +59,7 @@
"rals_conns": ["*localhost"],
"cdrs_conns": ["*localhost"],
"chargers_conns": ["*internal"],
"alterable_fields": ["Extra1"],
},

View File

@@ -66,6 +66,7 @@
"rals_conns": ["conn1"],
"cdrs_conns": ["conn1"],
"chargers_conns": ["*internal"],
"alterable_fields": ["Extra1"],
},

View File

@@ -27,13 +27,6 @@ import (
"github.com/cgrates/cgrates/utils"
)
var protectedSFlds = engine.MapEvent{
utils.CGRID: struct{}{},
utils.OriginHost: struct{}{},
utils.OriginID: struct{}{},
utils.Usage: struct{}{},
}
var unratedReqs = engine.MapEvent{
utils.META_POSTPAID: struct{}{},
utils.META_PSEUDOPREPAID: struct{}{},

View File

@@ -305,3 +305,28 @@ func (sr *SRun) debitReserve(dur time.Duration, lastUsage *time.Duration) (rDur
}
return
}
// updateSRuns updates the SRuns event with the alterable fields (is not thread safe)
func (s *Session) updateSRuns(updEv engine.MapEvent, alterableFields *utils.StringSet) {
if alterableFields.Size() == 0 {
return
}
for k, v := range updEv {
if !alterableFields.Has(k) {
continue
}
for _, sr := range s.SRuns {
sr.Event[k] = v
}
}
}
// UpdateSRuns updates the SRuns event with the alterable fields (is thread safe)
func (s *Session) UpdateSRuns(updEv engine.MapEvent, alterableFields *utils.StringSet) {
if alterableFields.Size() == 0 { // do not lock if we can't update any field
return
}
s.Lock()
s.updateSRuns(updEv, alterableFields)
s.Unlock()
}

View File

@@ -1352,11 +1352,12 @@ func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent, isMsg bool
// update fields from new event
for k, v := range updtEv {
if protectedSFlds.HasField(k) {
if utils.ProtectedSFlds.Has(k) {
continue
}
s.EventStart[k] = v // update previoius field with new one
}
s.updateSRuns(updtEv, sS.cgrCfg.SessionSCfg().AlterableFields)
sS.setSTerminator(s) // reset the terminator
}
//init has no updtEv
@@ -2475,6 +2476,7 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.ClientConnector,
}
}
s.UpdateSRuns(ev, sS.cgrCfg.SessionSCfg().AlterableFields)
if err = sS.terminateSession(s,
ev.GetDurationPtrIgnoreErrors(utils.Usage),
ev.GetDurationPtrIgnoreErrors(utils.LastUsed),
@@ -2584,14 +2586,8 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.ClientConnector,
}
// Use previously stored Session to generate CDRs
// update stored event with fields out of CDR
for k, v := range ev {
if protectedSFlds.HasField(k) {
continue
}
s.EventStart[k] = v // update previoius field with new one
}
// create one CGREvent for each session run plus *raw one
s.updateSRuns(ev, sS.cgrCfg.SessionSCfg().AlterableFields)
// create one CGREvent for each session run
var cgrEvs []*utils.CGREvent
if cgrEvs, err = s.asCGREvents(); err != nil {
return utils.NewErrServerError(err)

View File

@@ -321,6 +321,7 @@ func TestSessionsDataLastUsedMultipleUpdates(t *testing.T) {
utils.AnswerTime: time.Date(2016, time.January, 5, 18, 31, 05, 0, time.UTC),
utils.Usage: "8192", // 8 MB
utils.LastUsed: "7168",
"Extra1": "other",
},
},
}
@@ -344,6 +345,8 @@ func TestSessionsDataLastUsedMultipleUpdates(t *testing.T) {
} else if len(aSessions) != 1 ||
aSessions[0].Usage != time.Duration(15360) {
t.Errorf("wrong active sessions: %v", aSessions[0].Usage)
} else if aSessions[0].ExtraFields["Extra1"] != "other" {
t.Errorf("Expected: \"other\", received: %v", aSessions[0].ExtraFields["Extra1"])
}
usage = int64(1024)
@@ -366,6 +369,8 @@ func TestSessionsDataLastUsedMultipleUpdates(t *testing.T) {
utils.AnswerTime: time.Date(2016, time.January, 5, 18, 31, 05, 0, time.UTC),
utils.Usage: "1024", // 8 MB
utils.LastUsed: "5120", // 5 MB
"Extra1": "other2",
"Extra2": "other",
},
},
}
@@ -388,6 +393,10 @@ func TestSessionsDataLastUsedMultipleUpdates(t *testing.T) {
} else if len(aSessions) != 1 ||
aSessions[0].Usage != time.Duration(13312) { // 14MB in used, 2MB extra reserved
t.Errorf("wrong active sessions: %+v", aSessions[0].Usage)
} else if aSessions[0].ExtraFields["Extra1"] != "other2" {
t.Errorf("Expected: \"other2\", received: %v", aSessions[0].ExtraFields["Extra1"])
} else if _, has := aSessions[0].ExtraFields["Extra"]; has {
t.Errorf("Expected: no Extra2, received: %v", aSessions[0].ExtraFields["Extra2"])
}
usage = int64(1024)
@@ -451,6 +460,7 @@ func TestSessionsDataLastUsedMultipleUpdates(t *testing.T) {
utils.SetupTime: time.Date(2016, time.January, 5, 18, 30, 49, 0, time.UTC),
utils.AnswerTime: time.Date(2016, time.January, 5, 18, 31, 05, 0, time.UTC),
utils.LastUsed: "0", // refund 1024 (extra used) + 1024 (extra reserved)
"Extra1": "done",
},
},
}
@@ -489,6 +499,9 @@ func TestSessionsDataLastUsedMultipleUpdates(t *testing.T) {
if cdrs[0].Usage != "13312" {
t.Errorf("Unexpected CDR Usage received, cdr: %v %+v ", cdrs[0].Usage, cdrs[0])
}
if cdrs[0].ExtraFields["Extra1"] != "done" {
t.Errorf("Expected: \"done\", received: %v", cdrs[0].ExtraFields["Extra1"])
}
}
}

View File

@@ -131,6 +131,9 @@ var (
CacheDispatcherProfiles, CacheDispatcherHosts, CacheResourceFilterIndexes, CacheStatFilterIndexes,
CacheThresholdFilterIndexes, CacheSupplierFilterIndexes, CacheAttributeFilterIndexes,
CacheChargerFilterIndexes, CacheDispatcherFilterIndexes, CacheLoadIDs, CacheAccounts})
// ProtectedSFlds are the fields that sessions should not alter
ProtectedSFlds = NewStringSet([]string{CGRID, OriginHost, OriginID, Usage})
)
const (

View File

@@ -62,3 +62,11 @@ func (s *StringSet) AsSlice() []string {
func (s *StringSet) Data() map[string]struct{} {
return s.data
}
// Size returns the size of the set
func (s *StringSet) Size() int {
if s == nil || s.data == nil {
return 0
}
return len(s.data)
}