SMGeneric using CGRID instead of OriginID for session indexing

This commit is contained in:
DanB
2016-11-07 17:22:54 +01:00
parent 5e00fa9533
commit d253be03ac
10 changed files with 236 additions and 192 deletions

View File

@@ -99,7 +99,7 @@ type SmGenericConfig struct {
SessionTTL time.Duration
SessionTTLLastUsed *time.Duration
SessionTTLUsage *time.Duration
SessionIndexes []string
SessionIndexes utils.StringMap
}
func (self *SmGenericConfig) loadFromJsonCfg(jsnCfg *SmGenericJsonCfg) error {
@@ -162,7 +162,7 @@ func (self *SmGenericConfig) loadFromJsonCfg(jsnCfg *SmGenericJsonCfg) error {
}
}
if jsnCfg.Session_indexes != nil {
self.SessionIndexes = *jsnCfg.Session_indexes
self.SessionIndexes = utils.StringMapFromSlice(*jsnCfg.Session_indexes)
}
return nil
}

View File

@@ -279,7 +279,7 @@ func (s *Session) AsActiveSessions() []*ActiveSession {
pdd, _ := s.eventStart.GetPdd(utils.META_DEFAULT)
for _, sessionRun := range s.sessionRuns {
aSession := &ActiveSession{
CgrId: s.eventStart.GetCgrId(s.sessionManager.Timezone()),
CGRID: s.eventStart.GetCgrId(s.sessionManager.Timezone()),
TOR: utils.VOICE,
OriginID: s.eventStart.GetUUID(),
CdrHost: s.eventStart.GetOriginatorIP(utils.META_DEFAULT),
@@ -325,7 +325,7 @@ func (s *Session) AsMapStringIface() (map[string]interface{}, error) {
// Will be used when displaying active sessions via RPC
type ActiveSession struct {
CgrId string
CGRID string
TOR string // type of record, meta-field, should map to one of the TORs hardcoded inside the server <*voice|*data|*sms|*generic>
OriginID string // represents the unique accounting id given by the telecom switch generating the CDR
CdrHost string // represents the IP address of the host generating the CDR (automatically populated by the server)

View File

@@ -230,7 +230,7 @@ func (sma *SMAsterisk) ServiceShutdown() error {
// Internal method to disconnect session in asterisk
func (sma *SMAsterisk) V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) error {
channelID := SMGenericEvent(args.EventStart).GetUUID()
channelID := SMGenericEvent(args.EventStart).GetOriginID(utils.META_DEFAULT)
if err := sma.hangupChannel(channelID); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), channelID))
}

View File

@@ -35,6 +35,11 @@ var (
type SMGenericEvent map[string]interface{}
func (ev SMGenericEvent) HasField(fieldName string) (hasField bool) {
_, hasField = ev[fieldName]
return
}
func (self SMGenericEvent) GetName() string {
result, _ := utils.ConvertIfaceToString(self[utils.EVENT_NAME])
return result
@@ -48,19 +53,22 @@ func (self SMGenericEvent) GetTOR(fieldName string) string {
return result
}
func (self SMGenericEvent) GetCgrId(timezone string) string {
//setupTime, _ := self.GetSetupTime(utils.META_DEFAULT, timezone)
//return utils.Sha1(self.GetUUID(), setupTime.UTC().String())
return utils.Sha1(self.GetUUID())
func (self SMGenericEvent) GetCGRID(oIDFieldName string) string {
return utils.Sha1(self.GetOriginID(oIDFieldName), self.GetOriginatorIP(utils.META_DEFAULT))
}
func (self SMGenericEvent) GetUUID() string {
result, _ := utils.ConvertIfaceToString(self[utils.ACCID])
// GetOriginID returns the OriginID from event
// fieldName offers the possibility to extract info from other fields, eg: InitialOriginID
func (self SMGenericEvent) GetOriginID(fieldName string) string {
if fieldName == utils.META_DEFAULT {
fieldName = utils.ACCID
}
result, _ := utils.ConvertIfaceToString(self[fieldName])
return result
}
func (self SMGenericEvent) GetSessionIds() []string {
return []string{self.GetUUID()}
return []string{self.GetOriginID(utils.META_DEFAULT)}
}
func (self SMGenericEvent) GetDirection(fieldName string) string {
@@ -321,11 +329,11 @@ func (self SMGenericEvent) MissingParameter(timezone string) bool {
func (self SMGenericEvent) ParseEventValue(rsrFld *utils.RSRField, timezone string) string {
switch rsrFld.Id {
case utils.CGRID:
return rsrFld.ParseValue(self.GetCgrId(timezone))
return rsrFld.ParseValue(self.GetCGRID(utils.META_DEFAULT))
case utils.TOR:
return rsrFld.ParseValue(utils.VOICE)
case utils.ACCID:
return rsrFld.ParseValue(self.GetUUID())
return rsrFld.ParseValue(self.GetOriginID(utils.META_DEFAULT))
case utils.CDRHOST:
return rsrFld.ParseValue(self.GetOriginatorIP(utils.META_DEFAULT))
case utils.CDRSOURCE:
@@ -378,9 +386,9 @@ func (self SMGenericEvent) PassesFieldFilter(*utils.RSRField) (bool, string) {
func (self SMGenericEvent) AsStoredCdr(cfg *config.CGRConfig, timezone string) *engine.CDR {
storCdr := engine.NewCDRWithDefaults(cfg)
storCdr.CGRID = self.GetCgrId(timezone)
storCdr.CGRID = self.GetCGRID(utils.META_DEFAULT)
storCdr.ToR = utils.FirstNonEmpty(self.GetTOR(utils.META_DEFAULT), storCdr.ToR) // Keep default if none in the event
storCdr.OriginID = self.GetUUID()
storCdr.OriginID = self.GetOriginID(utils.META_DEFAULT)
storCdr.OriginHost = self.GetOriginatorIP(utils.META_DEFAULT)
storCdr.Source = self.GetCdrSource()
storCdr.RequestType = utils.FirstNonEmpty(self.GetReqType(utils.META_DEFAULT), storCdr.RequestType)

View File

@@ -54,11 +54,11 @@ func TestSMGenericEventParseFields(t *testing.T) {
if smGev.GetName() != "TEST_EVENT" {
t.Error("Unexpected: ", smGev.GetName())
}
if smGev.GetCgrId("UTC") != "8cb2237d0679ca88db6464eac60da96345513964" {
t.Error("Unexpected: ", smGev.GetCgrId("UTC"))
if smGev.GetCGRID(utils.META_DEFAULT) != "cade401f46f046311ed7f62df3dfbb84adb98aad" {
t.Error("Unexpected: ", smGev.GetCGRID(utils.META_DEFAULT))
}
if smGev.GetUUID() != "12345" {
t.Error("Unexpected: ", smGev.GetUUID())
if smGev.GetOriginID(utils.META_DEFAULT) != "12345" {
t.Error("Unexpected: ", smGev.GetOriginID(utils.META_DEFAULT))
}
if !reflect.DeepEqual(smGev.GetSessionIds(), []string{"12345"}) {
t.Error("Unexpected: ", smGev.GetSessionIds())
@@ -152,7 +152,7 @@ func TestSMGenericEventAsStoredCdr(t *testing.T) {
smGev[utils.CDRHOST] = "10.0.3.15"
smGev["Extra1"] = "Value1"
smGev["Extra2"] = 5
eStoredCdr := &engine.CDR{CGRID: "8cb2237d0679ca88db6464eac60da96345513964",
eStoredCdr := &engine.CDR{CGRID: "70c4d16dce41d1f2777b4e8442cff39cf87f5f19",
ToR: utils.SMS, OriginID: "12345", OriginHost: "10.0.3.15", Source: "SMG_TEST_EVENT", RequestType: utils.META_PREPAID,
Direction: utils.OUT, Tenant: "cgrates.org", Category: "call", Account: "account1", Subject: "subject1",
Destination: "+4986517174963", SetupTime: time.Date(2015, 11, 9, 14, 21, 24, 0, time.UTC), AnswerTime: time.Date(2015, 11, 9, 14, 22, 2, 0, time.UTC),

View File

@@ -757,7 +757,8 @@ func TestSMGVoiceSessionTTLWithRelocate(t *testing.T) {
t.Errorf("Expecting: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue())
}
var aSessions []*ActiveSession
if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT), OriginID: utils.StringPointer(smgEv.GetUUID())}, &aSessions); err != nil {
if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT),
OriginID: utils.StringPointer(smgEv.GetOriginID(utils.META_DEFAULT))}, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 1 {
t.Errorf("Unexpected number of sessions received: %+v", aSessions)
@@ -791,7 +792,8 @@ func TestSMGVoiceSessionTTLWithRelocate(t *testing.T) {
} else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != eAcntVal {
t.Errorf("Expecting: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue())
}
if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT), OriginID: utils.StringPointer(smgEv.GetUUID())}, &aSessions); err != nil {
if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT),
OriginID: utils.StringPointer(smgEv.GetOriginID(utils.META_DEFAULT))}, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 1 {
t.Errorf("Unexpected number of sessions received: %+v", aSessions)
@@ -805,7 +807,8 @@ func TestSMGVoiceSessionTTLWithRelocate(t *testing.T) {
} else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != eAcntVal {
t.Errorf("Expecting: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue())
}
if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT), OriginID: utils.StringPointer(smgEv.GetUUID())}, &aSessions); err != nil {
if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT),
OriginID: utils.StringPointer(smgEv.GetOriginID(utils.META_DEFAULT))}, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 0 {
t.Errorf("Unexpected number of sessions received: %+v", aSessions)

View File

@@ -30,6 +30,7 @@ import (
// One session handled by SM
type SMGSession struct {
CGRID string // Unique identifier for this session
EventStart SMGenericEvent // Event which started
stopDebit chan struct{} // Channel to communicate with debit loops when closing the session
RunID string // Keep a reference for the derived run
@@ -42,7 +43,7 @@ type SMGSession struct {
LastDebit time.Duration // last real debited duration
TotalUsage time.Duration // sum of lastUsage
clntConn rpcclient.RpcClientConnection // Reference towards client connection on SMG side so we can disconnect.
rater rpcclient.RpcClientConnection // Connector to Rater service
rals rpcclient.RpcClientConnection // Connector to rals service
cdrsrv rpcclient.RpcClientConnection // Connector to CDRS service
}
@@ -56,19 +57,19 @@ func (self *SMGSession) debitLoop(debitInterval time.Duration) {
return
case <-time.After(sleepDur):
if maxDebit, err := self.debit(debitInterval, nil); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not complete debit operation on session: %s, error: %s", self.EventStart.GetUUID(), err.Error()))
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not complete debit operation on session: %s, error: %s", self.CGRID, err.Error()))
disconnectReason := SYSTEM_ERROR
if err.Error() == utils.ErrUnauthorizedDestination.Error() {
disconnectReason = err.Error()
}
if err := self.disconnectSession(disconnectReason); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not disconnect session: %s, error: %s", self.EventStart.GetUUID(), err.Error()))
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not disconnect session: %s, error: %s", self.CGRID, err.Error()))
}
return
} else if maxDebit < debitInterval {
time.Sleep(maxDebit)
if err := self.disconnectSession(INSUFFICIENT_FUNDS); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not disconnect session: %s, error: %s", self.EventStart.GetUUID(), err.Error()))
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not disconnect session: %s, error: %s", self.CGRID, err.Error()))
}
return
}
@@ -108,7 +109,7 @@ func (self *SMGSession) debit(dur time.Duration, lastUsed *time.Duration) (time.
self.CD.TimeEnd = self.CD.TimeStart.Add(dur)
self.CD.DurationIndex += dur
cc := &engine.CallCost{}
if err := self.rater.Call("Responder.MaxDebit", self.CD, cc); err != nil {
if err := self.rals.Call("Responder.MaxDebit", self.CD, cc); err != nil {
self.LastUsage = 0
self.LastDebit = 0
return 0, err
@@ -189,7 +190,7 @@ func (self *SMGSession) refund(refundDuration time.Duration) error {
cd.RunID = self.CD.RunID
cd.Increments.Compress()
var response float64
err := self.rater.Call("Responder.RefundIncrements", cd, &response)
err := self.rals.Call("Responder.RefundIncrements", cd, &response)
if err != nil {
return err
}
@@ -231,7 +232,7 @@ func (self *SMGSession) disconnectSession(reason string) error {
// Merge the sum of costs and sends it to CDRS for storage
// originID could have been changed from original event, hence passing as argument here
// pass cc as the clone of original to avoid concurrency issues
func (self *SMGSession) saveOperations(originID string) error {
func (self *SMGSession) saveOperations(cgrID string) error {
if len(self.CallCosts) == 0 {
return nil // There are no costs to save, ignore the operation
}
@@ -244,26 +245,26 @@ func (self *SMGSession) saveOperations(originID string) error {
cd.RunID = self.CD.RunID
cd.Increments = roundIncrements
var response float64
if err := self.rater.Call("Responder.RefundRounding", cd, &response); err != nil {
if err := self.rals.Call("Responder.RefundRounding", cd, &response); err != nil {
return err
}
}
smCost := &engine.SMCost{
CGRID: self.EventStart.GetCgrId(self.Timezone),
CGRID: self.CGRID,
CostSource: utils.SESSION_MANAGER_SOURCE,
RunID: self.RunID,
OriginHost: self.EventStart.GetOriginatorIP(utils.META_DEFAULT),
OriginID: originID,
OriginID: self.EventStart.GetOriginID(utils.META_DEFAULT),
Usage: self.TotalUsage.Seconds(),
CostDetails: cc,
}
if len(smCost.CostDetails.Timespans) > MaxTimespansInCost { // Merge since we will get a callCost too big
if err := utils.Clone(cc, &smCost.CostDetails); err != nil { // Avoid concurrency on CC
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not clone callcost for sessionID: %s, RunID: %s, error: %s", originID, self.RunID, err.Error()))
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not clone callcost for sessionID: %s, RunID: %s, error: %s", cgrID, self.RunID, err.Error()))
}
go func(smCost *engine.SMCost) { // could take longer than the locked stage
if err := self.storeSMCost(smCost); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not store callcost for sessionID: %s, RunID: %s, error: %s", originID, self.RunID, err.Error()))
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not store callcost for sessionID: %s, RunID: %s, error: %s", cgrID, self.RunID, err.Error()))
}
}(smCost)
} else {
@@ -294,10 +295,10 @@ func (self *SMGSession) AsActiveSession(timezone string) *ActiveSession {
aTime, _ := self.EventStart.GetAnswerTime(utils.META_DEFAULT, timezone)
pdd, _ := self.EventStart.GetPdd(utils.META_DEFAULT)
aSession := &ActiveSession{
CgrId: self.EventStart.GetCgrId(timezone),
CGRID: self.CGRID,
TOR: self.EventStart.GetTOR(utils.META_DEFAULT),
RunID: self.RunID,
OriginID: self.EventStart.GetUUID(),
OriginID: self.EventStart.GetOriginID(utils.META_DEFAULT),
CdrHost: self.EventStart.GetOriginatorIP(utils.META_DEFAULT),
CdrSource: self.EventStart.GetCdrSource(),
ReqType: self.EventStart.GetReqType(utils.META_DEFAULT),

View File

@@ -44,14 +44,17 @@ type SMGReplicationConn struct {
Synchronous bool
}
func NewSMGeneric(cgrCfg *config.CGRConfig, rater rpcclient.RpcClientConnection, cdrsrv rpcclient.RpcClientConnection,
func NewSMGeneric(cgrCfg *config.CGRConfig, rals rpcclient.RpcClientConnection, cdrsrv rpcclient.RpcClientConnection,
smgReplConns []*SMGReplicationConn, timezone string) *SMGeneric {
aSessIdxCfg := cgrCfg.SmGenericConfig.SessionIndexes
aSessIdxCfg[utils.ACCID] = true // Make sure we have indexing for OriginID since it is a requirement on prefix searching
return &SMGeneric{cgrCfg: cgrCfg,
rater: rater,
rals: rals,
cdrsrv: cdrsrv,
smgReplConns: smgReplConns,
timezone: timezone,
activeSessions: make(map[string][]*SMGSession),
aSessionsIdxCfg: aSessIdxCfg,
aSessionsIndex: make(map[string]map[string]utils.StringMap),
passiveSessions: make(map[string][]*SMGSession),
sessionTerminators: make(map[string]*smgSessionTerminator),
@@ -60,13 +63,14 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rater rpcclient.RpcClientConnection,
type SMGeneric struct {
cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple
rater rpcclient.RpcClientConnection
rals rpcclient.RpcClientConnection
cdrsrv rpcclient.RpcClientConnection
smgReplConns []*SMGReplicationConn // list of connections where we will replicate our session data
timezone string
activeSessions map[string][]*SMGSession // group sessions per sessionId, multiple runs based on derived charging
aSessionsMux sync.RWMutex
aSessionsIndex map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue]utils.StringMap[sesionID]
aSessionsIdxCfg utils.StringMap // index configuration
aSessionsIndex map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue]utils.StringMap[cgrID]
aSIMux sync.RWMutex // protects aSessionsIndex
passiveSessions map[string][]*SMGSession // group passive sessions
pSessionsMux sync.RWMutex
@@ -74,6 +78,7 @@ type SMGeneric struct {
sTsMux sync.Mutex // protects sessionTerminators
responseCache *cache.ResponseCache // cache replies here
}
type smgSessionTerminator struct {
timer *time.Timer
endChan chan bool
@@ -93,8 +98,7 @@ func (smg *SMGeneric) setSessionTerminator(s *SMGSession) {
}
smg.sTsMux.Lock()
defer smg.sTsMux.Unlock()
uuid := s.EventStart.GetUUID()
if _, found := smg.sessionTerminators[uuid]; found { // already there, no need to set up
if _, found := smg.sessionTerminators[s.CGRID]; found { // already there, no need to set up
return
}
timer := time.NewTimer(ttl)
@@ -106,7 +110,7 @@ func (smg *SMGeneric) setSessionTerminator(s *SMGSession) {
ttlLastUsed: s.EventStart.GetSessionTTLLastUsed(),
ttlUsage: s.EventStart.GetSessionTTLUsage(),
}
smg.sessionTerminators[uuid] = terminator
smg.sessionTerminators[s.CGRID] = terminator
go func() {
select {
case <-timer.C:
@@ -115,15 +119,15 @@ func (smg *SMGeneric) setSessionTerminator(s *SMGSession) {
timer.Stop()
}
smg.sTsMux.Lock()
delete(smg.sessionTerminators, uuid)
delete(smg.sessionTerminators, s.CGRID)
smg.sTsMux.Unlock()
}()
}
// resetTerminatorTimer updates the timer for the session to a new ttl and terminate info
func (smg *SMGeneric) resetTerminatorTimer(uuid string, ttl time.Duration, ttlLastUsed, ttlUsage *time.Duration) {
func (smg *SMGeneric) resetTerminatorTimer(cgrID string, ttl time.Duration, ttlLastUsed, ttlUsage *time.Duration) {
smg.aSessionsMux.RLock()
if st, found := smg.sessionTerminators[uuid]; found {
if st, found := smg.sessionTerminators[cgrID]; found {
if ttl != 0 {
st.ttl = ttl
}
@@ -144,56 +148,55 @@ func (smg *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) {
if tmtr.ttlUsage != nil {
debitUsage = *tmtr.ttlUsage
}
aSessions := smg.getASession(s.EventStart.GetUUID())
aSessions := smg.getASession(s.CGRID)
if len(aSessions) == 0 { // will not continue if the session is not longer active
return
}
for _, s := range aSessions {
s.debit(debitUsage, tmtr.ttlLastUsed)
}
smg.sessionEnd(s.EventStart.GetUUID(), s.TotalUsage)
smg.sessionEnd(s.CGRID, s.TotalUsage)
cdr := s.EventStart.AsStoredCdr(smg.cgrCfg, smg.timezone)
cdr.Usage = s.TotalUsage
var reply string
smg.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply)
smg.replicateSessions(s.EventStart.GetUUID())
smg.replicateSessions(s.CGRID)
}
func (smg *SMGeneric) recordASession(uuid string, s *SMGSession) {
func (smg *SMGeneric) recordASession(s *SMGSession) {
smg.aSessionsMux.Lock()
smg.activeSessions[uuid] = append(smg.activeSessions[uuid], s)
smg.activeSessions[s.CGRID] = append(smg.activeSessions[s.CGRID], s)
smg.setSessionTerminator(s)
smg.indexASession(uuid, s)
smg.indexASession(s)
smg.aSessionsMux.Unlock()
}
// Remove session from session list, removes all related in case of multiple runs, true if item was found
func (smg *SMGeneric) unrecordASession(uuid string) bool {
func (smg *SMGeneric) unrecordASession(cgrID string) bool {
smg.aSessionsMux.Lock()
defer smg.aSessionsMux.Unlock()
if _, found := smg.activeSessions[uuid]; !found {
if _, found := smg.activeSessions[cgrID]; !found {
return false
}
delete(smg.activeSessions, uuid)
if st, found := smg.sessionTerminators[uuid]; found {
delete(smg.activeSessions, cgrID)
if st, found := smg.sessionTerminators[cgrID]; found {
st.endChan <- true
}
smg.unindexASession(uuid)
smg.unindexASession(cgrID)
return true
}
// indexASession explores settings and builds smg.aSessionsIndex based on that
func (smg *SMGeneric) indexASession(uuid string, s *SMGSession) bool {
func (smg *SMGeneric) indexASession(s *SMGSession) bool {
smg.aSIMux.Lock()
defer smg.aSIMux.Unlock()
ev := s.EventStart
for _, fieldName := range smg.cgrCfg.SmGenericConfig.SessionIndexes {
fieldVal, err := utils.ReflectFieldAsString(ev, fieldName, "")
for fieldName := range smg.aSessionsIdxCfg {
fieldVal, err := utils.ReflectFieldAsString(s.EventStart, fieldName, "")
if err != nil {
if err == utils.ErrNotFound {
fieldVal = utils.NOT_AVAILABLE
} else {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Error retrieving field: %s from event: %+v", fieldName, ev))
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Error retrieving field: %s from event: %+v", fieldName, s.EventStart))
continue
}
}
@@ -206,21 +209,20 @@ func (smg *SMGeneric) indexASession(uuid string, s *SMGSession) bool {
if _, hasFieldVal := smg.aSessionsIndex[fieldName][fieldVal]; !hasFieldVal {
smg.aSessionsIndex[fieldName][fieldVal] = make(utils.StringMap)
}
smg.aSessionsIndex[fieldName][fieldVal][uuid] = true
smg.aSessionsIndex[fieldName][fieldVal][s.CGRID] = true
}
return true
}
// unindexASession removes a session from indexes
func (smg *SMGeneric) unindexASession(uuid string) bool {
func (smg *SMGeneric) unindexASession(cgrID string) (found bool) {
smg.aSIMux.Lock()
defer smg.aSIMux.Unlock()
var found bool
for fldName := range smg.aSessionsIndex {
for fldVal := range smg.aSessionsIndex[fldName] {
if _, hasUUID := smg.aSessionsIndex[fldName][fldVal][uuid]; hasUUID {
if _, hasCGRID := smg.aSessionsIndex[fldName][fldVal][cgrID]; hasCGRID {
found = true
delete(smg.aSessionsIndex[fldName][fldVal], uuid)
delete(smg.aSessionsIndex[fldName][fldVal], cgrID)
if len(smg.aSessionsIndex[fldName][fldVal]) == 0 {
delete(smg.aSessionsIndex[fldName], fldVal)
}
@@ -230,7 +232,7 @@ func (smg *SMGeneric) unindexASession(uuid string) bool {
}
}
}
return found
return
}
// getSessionIDsMatchingIndexes will check inside indexes if it can find sessionIDs matching all filters
@@ -257,50 +259,49 @@ func (smg *SMGeneric) getSessionIDsMatchingIndexes(fltrs map[string]string) (uti
continue
}
// Higher run, takes out non matching indexes
for sessID := range sessionIDxes[fltrName][fltrVal] {
if _, hasUUID := matchingSessions[sessID]; !hasUUID {
delete(matchingSessions, sessID)
for cgrID := range sessionIDxes[fltrName][fltrVal] {
if _, hasCGRID := matchingSessions[cgrID]; !hasCGRID {
delete(matchingSessions, cgrID)
}
}
}
return matchingSessions.Clone(), matchedIndexes
}
// getSessionIDsForPrefix works with session relocation returning list of sessions with ID matching prefix
func (smg *SMGeneric) getSessionIDsForPrefix(prefix string) []string {
// getSessionIDsForPrefix works with session relocation returning list of sessions with ID matching prefix for OriginID field
func (smg *SMGeneric) getSessionIDsForPrefix(prefix string) (cgrIDs []string) {
smg.aSessionsMux.Lock()
defer smg.aSessionsMux.Unlock()
sessionIDs := make([]string, 0)
for sessionID := range smg.activeSessions {
if strings.HasPrefix(sessionID, prefix) {
sessionIDs = append(sessionIDs, sessionID)
for originID := range smg.aSessionsIndex[utils.ACCID] {
if strings.HasPrefix(originID, prefix) {
cgrIDs = append(cgrIDs, smg.aSessionsIndex[utils.ACCID][originID].Slice()...)
}
}
return sessionIDs
return
}
// Returns sessions/derived for a specific uuid
func (smg *SMGeneric) getASession(uuid string) []*SMGSession {
func (smg *SMGeneric) getASession(cgrID string) []*SMGSession {
smg.aSessionsMux.RLock()
defer smg.aSessionsMux.RUnlock()
return smg.activeSessions[uuid]
return smg.activeSessions[cgrID]
}
// sessionStart will handle a new session, pass the connectionId so we can communicate on disconnect request
func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.RpcClientConnection) error {
sessionId := evStart.GetUUID()
processed, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on UUID level
cgrID := evStart.GetCGRID(utils.META_DEFAULT)
processed, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on CGRID level
var sessionRuns []*engine.SessionRun
if err := smg.rater.Call("Responder.GetSessionRuns", evStart.AsStoredCdr(smg.cgrCfg, smg.timezone), &sessionRuns); err != nil {
if err := smg.rals.Call("Responder.GetSessionRuns", evStart.AsStoredCdr(smg.cgrCfg, smg.timezone), &sessionRuns); err != nil {
return true, err
} else if len(sessionRuns) == 0 {
return true, nil
}
stopDebitChan := make(chan struct{})
for _, sessionRun := range sessionRuns {
s := &SMGSession{EventStart: evStart, RunID: sessionRun.DerivedCharger.RunID, Timezone: smg.timezone,
rater: smg.rater, cdrsrv: smg.cdrsrv, CD: sessionRun.CallDescriptor, clntConn: clntConn}
smg.recordASession(sessionId, s)
s := &SMGSession{CGRID: cgrID, EventStart: evStart, RunID: sessionRun.DerivedCharger.RunID, Timezone: smg.timezone,
rals: smg.rals, cdrsrv: smg.cdrsrv, CD: sessionRun.CallDescriptor, clntConn: clntConn}
smg.recordASession(s)
//utils.Logger.Info(fmt.Sprintf("<SMGeneric> Starting session: %s, runId: %s", sessionId, s.runId))
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 {
s.stopDebit = stopDebitChan
@@ -308,7 +309,7 @@ func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.Rp
}
}
return true, nil
}, smg.cgrCfg.LockingTimeout, sessionId)
}, smg.cgrCfg.LockingTimeout, cgrID)
if processed == nil || processed == false {
utils.Logger.Err("<SMGeneric> Cannot start session, empty reply")
return utils.ErrServerError
@@ -317,13 +318,13 @@ func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.Rp
}
// sessionEnd will end a session from outside
func (smg *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error {
func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) error {
_, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on UUID level
ss := smg.getASession(sessionId)
ss := smg.getASession(cgrID)
if len(ss) == 0 { // Not handled by us
return nil, nil
}
if !smg.unrecordASession(sessionId) { // Unreference it early so we avoid concurrency
if !smg.unrecordASession(cgrID) { // Unreference it early so we avoid concurrency
return nil, nil // Did not find the session so no need to close it anymore
}
for idx, s := range ss {
@@ -331,32 +332,32 @@ func (smg *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error {
if idx == 0 && s.stopDebit != nil {
close(s.stopDebit) // Stop automatic debits
}
aTime, err := s.EventStart.GetAnswerTime(utils.META_DEFAULT, smg.cgrCfg.DefaultTimezone)
aTime, err := s.EventStart.GetAnswerTime(utils.META_DEFAULT, smg.timezone)
if err != nil || aTime.IsZero() {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not retrieve answer time for session: %s, runId: %s, aTime: %+v, error: %v",
sessionId, s.RunID, aTime, err))
cgrID, s.RunID, aTime, err))
continue // Unanswered session
}
if err := s.close(aTime.Add(usage)); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not close session: %s, runId: %s, error: %s", sessionId, s.RunID, err.Error()))
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not close session: %s, runId: %s, error: %s", cgrID, s.RunID, err.Error()))
}
if err := s.saveOperations(sessionId); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not save session: %s, runId: %s, error: %s", sessionId, s.RunID, err.Error()))
if err := s.saveOperations(cgrID); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not save session: %s, runId: %s, error: %s", cgrID, s.RunID, err.Error()))
}
}
return nil, nil
}, time.Duration(2)*time.Second, sessionId)
}, time.Duration(2)*time.Second, cgrID)
return err
}
// sessionRelocate is used when an update will relocate an initial session (eg multiple data streams)
func (smg *SMGeneric) sessionRelocate(sessionID, initialID string) error {
func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) error {
_, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on initialID level
if utils.IsSliceMember([]string{sessionID, initialID}, "") { // Not allowed empty params here
if utils.IsSliceMember([]string{initialID, cgrID, newOriginID}, "") { // Not allowed empty params here
return nil, utils.ErrMandatoryIeMissing
}
ssNew := smg.getASession(sessionID) // Already relocated
if len(ssNew) != 0 {
ssNew := smg.getASession(cgrID)
if len(ssNew) != 0 { // Already relocated
return nil, nil
}
ss := smg.getASession(initialID)
@@ -364,8 +365,9 @@ func (smg *SMGeneric) sessionRelocate(sessionID, initialID string) error {
return nil, utils.ErrNotFound
}
for i, s := range ss {
s.EventStart[utils.ACCID] = sessionID // Overwrite initialSessionID with new one
smg.recordASession(sessionID, s)
s.CGRID = cgrID // Overwrite initial CGRID with new one
s.EventStart[utils.ACCID] = newOriginID // Overwrite OriginID for session indexing
smg.recordASession(s)
if i == 0 {
smg.unrecordASession(initialID)
}
@@ -376,12 +378,12 @@ func (smg *SMGeneric) sessionRelocate(sessionID, initialID string) error {
}
// replicateSessions will replicate session based on configuration
func (smg *SMGeneric) replicateSessions(originID string) (err error) {
func (smg *SMGeneric) replicateSessions(cgrID string) (err error) {
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 {
return
}
smg.aSessionsMux.RLock()
aSessions := smg.activeSessions[originID]
aSessions := smg.activeSessions[cgrID]
smg.aSessionsMux.RUnlock()
var wg sync.WaitGroup
for _, rplConn := range smg.smgReplConns {
@@ -390,7 +392,7 @@ func (smg *SMGeneric) replicateSessions(originID string) (err error) {
}
go func(conn rpcclient.RpcClientConnection, sync bool, ss []*SMGSession) {
var reply string
argSet := ArgsSetPassiveSessions{OriginID: originID, Sessions: ss}
argSet := ArgsSetPassiveSessions{CGRID: cgrID, Sessions: ss}
conn.Call("SMGenericV1.SetPassiveSessions", argSet, &reply)
if sync {
wg.Done()
@@ -402,12 +404,12 @@ func (smg *SMGeneric) replicateSessions(originID string) (err error) {
}
// sessionActiveToPassive is a mechanism to transit a session from active to passive state
func (smg *SMGeneric) sessionActiveToPassive(originID string) (err error) {
func (smg *SMGeneric) sessionActiveToPassive(cgrID string) (err error) {
return
}
// sessionPassiveToActive is a mechanism to transit a session from passive to active state
func (smg *SMGeneric) sessionPassiveToActive(originID string) (err error) {
func (smg *SMGeneric) sessionPassiveToActive(cgrID string) (err error) {
return
}
@@ -415,7 +417,7 @@ func (smg *SMGeneric) sessionPassiveToActive(originID string) (err error) {
// MaxUsage calculates maximum usage allowed for given gevent
func (smg *SMGeneric) MaxUsage(gev SMGenericEvent) (maxUsage time.Duration, err error) {
cacheKey := "MaxUsage" + gev.GetCgrId(smg.timezone)
cacheKey := "MaxUsage" + gev.GetCGRID(utils.META_DEFAULT)
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return (item.Value.(time.Duration)), item.Err
}
@@ -423,7 +425,7 @@ func (smg *SMGeneric) MaxUsage(gev SMGenericEvent) (maxUsage time.Duration, err
gev[utils.EVENT_NAME] = utils.CGR_AUTHORIZATION
storedCdr := gev.AsStoredCdr(config.CgrConfig(), smg.timezone)
var maxDur float64
if err = smg.rater.Call("Responder.GetDerivedMaxSessionTime", storedCdr, &maxDur); err != nil {
if err = smg.rals.Call("Responder.GetDerivedMaxSessionTime", storedCdr, &maxDur); err != nil {
return
}
maxUsage = time.Duration(maxDur)
@@ -431,7 +433,7 @@ func (smg *SMGeneric) MaxUsage(gev SMGenericEvent) (maxUsage time.Duration, err
}
func (smg *SMGeneric) LCRSuppliers(gev SMGenericEvent) (suppls []string, err error) {
cacheKey := "LCRSuppliers" + gev.GetCgrId(smg.timezone) + gev.GetAccount(utils.META_DEFAULT) + gev.GetDestination(utils.META_DEFAULT)
cacheKey := "LCRSuppliers" + gev.GetCGRID(utils.META_DEFAULT) + gev.GetAccount(utils.META_DEFAULT) + gev.GetDestination(utils.META_DEFAULT)
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
if item.Value != nil {
suppls = (item.Value.([]string))
@@ -443,12 +445,12 @@ func (smg *SMGeneric) LCRSuppliers(gev SMGenericEvent) (suppls []string, err err
gev[utils.EVENT_NAME] = utils.CGR_LCR_REQUEST
var cd *engine.CallDescriptor
cd, err = gev.AsLcrRequest().AsCallDescriptor(smg.timezone)
cd.CgrID = gev.GetCgrId(smg.timezone)
cd.CgrID = gev.GetCGRID(utils.META_DEFAULT)
if err != nil {
return
}
var lcr engine.LCRCost
if err = smg.rater.Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil {
if err = smg.rals.Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil {
return
}
if lcr.HasErrors() {
@@ -462,13 +464,14 @@ func (smg *SMGeneric) LCRSuppliers(gev SMGenericEvent) (suppls []string, err err
// Called on session start
func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (maxUsage time.Duration, err error) {
cacheKey := "InitiateSession" + gev.GetCgrId(smg.timezone)
cgrID := gev.GetCGRID(utils.META_DEFAULT)
cacheKey := "InitiateSession" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Value.(time.Duration), item.Err
}
defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: maxUsage, Err: err}) // schedule response caching
if err = smg.sessionStart(gev, clnt); err != nil {
smg.sessionEnd(gev.GetUUID(), 0)
smg.sessionEnd(cgrID, 0)
return
}
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Session handled by debit loop
@@ -477,14 +480,15 @@ func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, clnt rpcclient.RpcClie
}
maxUsage, err = smg.UpdateSession(gev, clnt)
if err != nil || maxUsage == 0 {
smg.sessionEnd(gev.GetUUID(), 0)
smg.sessionEnd(cgrID, 0)
}
return
}
// Execute debits for usage/maxUsage
func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (maxUsage time.Duration, err error) {
cacheKey := "UpdateSession" + gev.GetCgrId(smg.timezone)
cgrID := gev.GetCGRID(utils.META_DEFAULT)
cacheKey := "UpdateSession" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Value.(time.Duration), item.Err
}
@@ -493,10 +497,11 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient
err = errors.New("ACTIVE_DEBIT_LOOP")
return
}
defer smg.replicateSessions(gev.GetUUID())
if initialID, errGet := gev.GetFieldAsString(utils.InitialOriginID); errGet == nil {
defer smg.replicateSessions(initialID)
err = smg.sessionRelocate(gev.GetUUID(), initialID)
defer smg.replicateSessions(gev.GetCGRID(utils.META_DEFAULT))
if gev.HasField(utils.InitialOriginID) {
initialCGRID := gev.GetCGRID(utils.InitialOriginID)
defer smg.replicateSessions(initialCGRID)
err = smg.sessionRelocate(initialCGRID, cgrID, gev.GetOriginID(utils.META_DEFAULT))
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update
err = smg.sessionStart(gev, clnt)
}
@@ -504,7 +509,7 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient
return
}
}
smg.resetTerminatorTimer(gev.GetUUID(), gev.GetSessionTTL(), gev.GetSessionTTLLastUsed(), gev.GetSessionTTLUsage())
smg.resetTerminatorTimer(cgrID, gev.GetSessionTTL(), gev.GetSessionTTLLastUsed(), gev.GetSessionTTLUsage())
var lastUsed *time.Duration
var evLastUsed time.Duration
if evLastUsed, err = gev.GetLastUsed(utils.META_DEFAULT); err == nil {
@@ -518,9 +523,9 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient
}
return
}
aSessions := smg.getASession(gev.GetUUID())
aSessions := smg.getASession(cgrID)
if len(aSessions) == 0 {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> SessionUpdate with no active sessions for event: <%s>", gev.GetUUID()))
utils.Logger.Err(fmt.Sprintf("<SMGeneric> SessionUpdate with no active sessions for event: <%s>", cgrID))
err = utils.ErrServerError
return
}
@@ -537,14 +542,16 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient
// Called on session end, should stop debit loop
func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (err error) {
cacheKey := "TerminateSession" + gev.GetCgrId(smg.timezone)
cgrID := gev.GetCGRID(utils.META_DEFAULT)
cacheKey := "TerminateSession" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Err
}
defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Err: err})
if initialID, errGet := gev.GetFieldAsString(utils.InitialOriginID); errGet == nil {
err = smg.sessionRelocate(gev.GetUUID(), initialID)
defer smg.replicateSessions(initialID)
if gev.HasField(utils.InitialOriginID) {
initialCGRID := gev.GetCGRID(utils.InitialOriginID)
defer smg.replicateSessions(initialCGRID)
err = smg.sessionRelocate(initialCGRID, cgrID, gev.GetOriginID(utils.META_DEFAULT))
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update
err = smg.sessionStart(gev, clnt)
}
@@ -552,9 +559,11 @@ func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcCli
return
}
}
sessionIDs := []string{gev.GetUUID()}
if sessionIDPrefix, errPrefix := gev.GetFieldAsString(utils.OriginIDPrefix); errPrefix == nil { // OriginIDPrefix is present, OriginID will not be anymore considered
sessionIDs = smg.getSessionIDsForPrefix(sessionIDPrefix)
sessionIDs := []string{cgrID}
if gev.HasField(utils.OriginIDPrefix) { // OriginIDPrefix is present, OriginID will not be anymore considered
if sessionIDPrefix, errPrefix := gev.GetFieldAsString(utils.OriginIDPrefix); errPrefix == nil {
sessionIDs = smg.getSessionIDsForPrefix(sessionIDPrefix)
}
}
usage, errUsage := gev.GetUsage(utils.META_DEFAULT)
var lastUsed time.Duration
@@ -598,13 +607,14 @@ func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcCli
// Processes one time events (eg: SMS)
func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxUsage time.Duration, err error) {
cacheKey := "ChargeEvent" + gev.GetCgrId(smg.timezone)
cgrID := gev.GetCGRID(utils.META_DEFAULT)
cacheKey := "ChargeEvent" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Value.(time.Duration), item.Err
}
defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: maxUsage, Err: err})
var sessionRuns []*engine.SessionRun
if err = smg.rater.Call("Responder.GetSessionRuns", gev.AsStoredCdr(smg.cgrCfg, smg.timezone), &sessionRuns); err != nil {
if err = smg.rals.Call("Responder.GetSessionRuns", gev.AsStoredCdr(smg.cgrCfg, smg.timezone), &sessionRuns); err != nil {
return
} else if len(sessionRuns) == 0 {
return
@@ -612,7 +622,7 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxUsage time.Duration, e
var maxDurInit bool // Avoid differences between default 0 and received 0
for _, sR := range sessionRuns {
cc := new(engine.CallCost)
if err = smg.rater.Call("Responder.MaxDebit", sR.CallDescriptor, cc); err != nil {
if err = smg.rals.Call("Responder.MaxDebit", sR.CallDescriptor, cc); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not Debit CD: %+v, RunID: %s, error: %s", sR.CallDescriptor, sR.DerivedCharger.RunID, err.Error()))
break
}
@@ -645,12 +655,12 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxUsage time.Duration, e
if len(refundIncrements) > 0 {
cd := cc.CreateCallDescriptor()
cd.Increments = refundIncrements
cd.CgrID = sR.CallDescriptor.CgrID
cd.CgrID = cgrID
cd.RunID = sR.CallDescriptor.RunID
cd.Increments.Compress()
//utils.Logger.Info(fmt.Sprintf("Refunding session run callcost: %s", utils.ToJSON(cd)))
var response float64
err = smg.rater.Call("Responder.RefundIncrements", cd, &response)
err = smg.rals.Call("Responder.RefundIncrements", cd, &response)
if err != nil {
return
}
@@ -675,17 +685,17 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxUsage time.Duration, e
cd := cc.CreateCallDescriptor()
cd.Increments = roundIncrements
var response float64
if errRefund := smg.rater.Call("Responder.RefundRounding", cd, &response); errRefund != nil {
if errRefund := smg.rals.Call("Responder.RefundRounding", cd, &response); errRefund != nil {
utils.Logger.Err(fmt.Sprintf("<SM> ERROR failed to refund rounding: %v", errRefund))
}
}
var reply string
smCost := &engine.SMCost{
CGRID: gev.GetCgrId(smg.timezone),
CGRID: cgrID,
CostSource: utils.SESSION_MANAGER_SOURCE,
RunID: sR.DerivedCharger.RunID,
OriginHost: gev.GetOriginatorIP(utils.META_DEFAULT),
OriginID: gev.GetUUID(),
OriginID: gev.GetOriginID(utils.META_DEFAULT),
CostDetails: cc,
}
if errStore := smg.cdrsrv.Call("CdrsV1.StoreSMCost", engine.AttrCDRSStoreSMCost{Cost: smCost,
@@ -702,7 +712,8 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxUsage time.Duration, e
}
func (smg *SMGeneric) ProcessCDR(gev SMGenericEvent) (err error) {
cacheKey := "ProcessCDR" + gev.GetCgrId(smg.timezone)
cgrID := gev.GetCGRID(utils.META_DEFAULT)
cacheKey := "ProcessCDR" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Err
}
@@ -778,14 +789,14 @@ func (smg *SMGeneric) ActiveSessions(fltrs map[string]string, count bool) (aSess
return
}
func (smg *SMGeneric) getPassiveSessions(originID, runID string) (pss map[string][]*SMGSession) {
func (smg *SMGeneric) getPassiveSessions(cgrID, runID string) (pss map[string][]*SMGSession) {
smg.pSessionsMux.RLock()
if originID == "" {
if cgrID == "" {
if len(smg.passiveSessions) != 0 {
pss = smg.passiveSessions
}
} else {
pSSlc := smg.passiveSessions[originID]
pSSlc := smg.passiveSessions[cgrID]
if runID != "" {
var found bool
for _, s := range pSSlc {
@@ -799,7 +810,7 @@ func (smg *SMGeneric) getPassiveSessions(originID, runID string) (pss map[string
}
}
if len(pSSlc) != 0 {
pss = map[string][]*SMGSession{originID: pSSlc}
pss = map[string][]*SMGSession{cgrID: pSSlc}
}
}
smg.pSessionsMux.RUnlock()
@@ -889,7 +900,7 @@ func (smg *SMGeneric) BiRPCV1InitiateSession(clnt rpcclient.RpcClientConnection,
return nil
}
// Interim updates, returns remaining duration from the rater
// Interim updates, returns remaining duration from the RALs
func (smg *SMGeneric) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *float64) error {
if minMaxUsage, err := smg.UpdateSession(ev, clnt); err != nil {
return utils.NewErrServerError(err)
@@ -946,7 +957,7 @@ func (smg *SMGeneric) BiRPCV1ActiveSessionsCount(attrs utils.AttrSMGGetActiveSes
}
type ArgsSetPassiveSessions struct {
OriginID string
CGRID string
Sessions []*SMGSession
}
@@ -954,9 +965,9 @@ type ArgsSetPassiveSessions struct {
func (smg *SMGeneric) BiRPCV1SetPassiveSessions(args ArgsSetPassiveSessions, reply *string) error {
smg.pSessionsMux.Lock()
if len(args.Sessions) == 0 { // Remove
delete(smg.passiveSessions, args.OriginID)
delete(smg.passiveSessions, args.CGRID)
} else { // Set with overwrite
smg.passiveSessions[args.OriginID] = args.Sessions
smg.passiveSessions[args.CGRID] = args.Sessions
}
smg.pSessionsMux.Unlock()
*reply = utils.OK
@@ -964,15 +975,15 @@ func (smg *SMGeneric) BiRPCV1SetPassiveSessions(args ArgsSetPassiveSessions, rep
}
type ArgsGetPassiveSessions struct {
OriginID string
RunID string
CGRID string
RunID string
}
func (smg *SMGeneric) BiRPCV1GetPassiveSessions(attrs ArgsGetPassiveSessions, pSessions *map[string][]*SMGSession) error {
if attrs.RunID != "" && attrs.OriginID == "" {
if attrs.RunID != "" && attrs.CGRID == "" {
return utils.ErrMandatoryIeMissing
}
pSS := smg.getPassiveSessions(attrs.OriginID, attrs.RunID)
pSS := smg.getPassiveSessions(attrs.CGRID, attrs.RunID)
if len(pSS) == 0 {
return utils.ErrNotFound
}

View File

@@ -29,7 +29,8 @@ var smgCfg *config.CGRConfig
func init() {
smgCfg, _ = config.NewDefaultCGRConfig()
smgCfg.SmGenericConfig.SessionIndexes = []string{"Tenant", "Account", "Extra3", "Extra4"}
smgCfg.SmGenericConfig.SessionIndexes = utils.StringMap{"Tenant": true,
"Account": true, "Extra3": true, "Extra4": true}
}
@@ -59,28 +60,33 @@ func TestSMGSessionIndexing(t *testing.T) {
"Extra3": "",
}
// Index first session
smgSession := &SMGSession{EventStart: smGev}
uuid := smGev.GetUUID()
smg.indexASession(uuid, smgSession)
smgSession := &SMGSession{CGRID: smGev.GetCGRID(utils.META_DEFAULT), EventStart: smGev}
cgrID := smGev.GetCGRID(utils.META_DEFAULT)
smg.indexASession(smgSession)
eIndexes := map[string]map[string]utils.StringMap{
"OriginID": map[string]utils.StringMap{
"12345": utils.StringMap{
cgrID: true,
},
},
"Tenant": map[string]utils.StringMap{
"cgrates.org": utils.StringMap{
uuid: true,
cgrID: true,
},
},
"Account": map[string]utils.StringMap{
"account1": utils.StringMap{
uuid: true,
cgrID: true,
},
},
"Extra3": map[string]utils.StringMap{
utils.MetaEmpty: utils.StringMap{
uuid: true,
cgrID: true,
},
},
"Extra4": map[string]utils.StringMap{
utils.NOT_AVAILABLE: utils.StringMap{
uuid: true,
cgrID: true,
},
},
}
@@ -98,38 +104,46 @@ func TestSMGSessionIndexing(t *testing.T) {
"Extra3": "",
"Extra4": "info2",
}
uuid2 := smGev2.GetUUID()
smgSession2 := &SMGSession{EventStart: smGev2}
smg.indexASession(uuid2, smgSession2)
cgrID2 := smGev2.GetCGRID(utils.META_DEFAULT)
smgSession2 := &SMGSession{CGRID: smGev2.GetCGRID(utils.META_DEFAULT), EventStart: smGev2}
smg.indexASession(smgSession2)
eIndexes = map[string]map[string]utils.StringMap{
"OriginID": map[string]utils.StringMap{
"12345": utils.StringMap{
cgrID: true,
},
"12346": utils.StringMap{
cgrID2: true,
},
},
"Tenant": map[string]utils.StringMap{
"cgrates.org": utils.StringMap{
uuid: true,
cgrID: true,
},
"itsyscom.com": utils.StringMap{
uuid2: true,
cgrID2: true,
},
},
"Account": map[string]utils.StringMap{
"account1": utils.StringMap{
uuid: true,
cgrID: true,
},
"account2": utils.StringMap{
uuid2: true,
cgrID2: true,
},
},
"Extra3": map[string]utils.StringMap{
utils.MetaEmpty: utils.StringMap{
uuid: true,
uuid2: true,
cgrID: true,
cgrID2: true,
},
},
"Extra4": map[string]utils.StringMap{
utils.NOT_AVAILABLE: utils.StringMap{
uuid: true,
cgrID: true,
},
"info2": utils.StringMap{
uuid2: true,
cgrID2: true,
},
},
}
@@ -137,26 +151,31 @@ func TestSMGSessionIndexing(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", eIndexes, smg.aSessionsIndex)
}
// Unidex first session
smg.unindexASession(uuid)
smg.unindexASession(cgrID)
eIndexes = map[string]map[string]utils.StringMap{
"OriginID": map[string]utils.StringMap{
"12346": utils.StringMap{
cgrID2: true,
},
},
"Tenant": map[string]utils.StringMap{
"itsyscom.com": utils.StringMap{
uuid2: true,
cgrID2: true,
},
},
"Account": map[string]utils.StringMap{
"account2": utils.StringMap{
uuid2: true,
cgrID2: true,
},
},
"Extra3": map[string]utils.StringMap{
utils.MetaEmpty: utils.StringMap{
uuid2: true,
cgrID2: true,
},
},
"Extra4": map[string]utils.StringMap{
"info2": utils.StringMap{
uuid2: true,
cgrID2: true,
},
},
}
@@ -190,7 +209,7 @@ func TestSMGActiveSessions(t *testing.T) {
"Extra2": 5,
"Extra3": "",
}
smg.recordASession(smGev1.GetUUID(), &SMGSession{EventStart: smGev1})
smg.recordASession(&SMGSession{CGRID: smGev1.GetCGRID(utils.META_DEFAULT), EventStart: smGev1})
smGev2 := SMGenericEvent{
utils.EVENT_NAME: "TEST_EVENT",
utils.TOR: "*voice",
@@ -211,7 +230,7 @@ func TestSMGActiveSessions(t *testing.T) {
"Extra1": "Value1",
"Extra3": "extra3",
}
smg.recordASession(smGev2.GetUUID(), &SMGSession{EventStart: smGev2})
smg.recordASession(&SMGSession{CGRID: smGev2.GetCGRID(utils.META_DEFAULT), EventStart: smGev2})
if aSessions, _, err := smg.ActiveSessions(nil, false); err != nil {
t.Error(err)
} else if len(aSessions) != 2 {
@@ -268,8 +287,9 @@ func TestGetPassiveSessions(t *testing.T) {
"Extra3": "",
}
// Index first session
smgSession11 := &SMGSession{EventStart: smGev1, RunID: utils.META_DEFAULT}
smgSession12 := &SMGSession{EventStart: smGev1, RunID: "second_run"}
smgSession11 := &SMGSession{CGRID: smGev1.GetCGRID(utils.META_DEFAULT), EventStart: smGev1, RunID: utils.META_DEFAULT}
smgSession12 := &SMGSession{CGRID: smGev1.GetCGRID(utils.META_DEFAULT), EventStart: smGev1, RunID: "second_run"}
smg.passiveSessions[smgSession11.CGRID] = []*SMGSession{smgSession11, smgSession12}
smGev2 := SMGenericEvent{
utils.EVENT_NAME: "TEST_EVENT",
utils.TOR: "*voice",
@@ -293,19 +313,18 @@ func TestGetPassiveSessions(t *testing.T) {
"Extra2": 5,
"Extra3": "",
}
smgSession21 := &SMGSession{EventStart: smGev2, RunID: utils.META_DEFAULT}
smg.passiveSessions[smGev1.GetUUID()] = []*SMGSession{smgSession11, smgSession12}
if pSS := smg.getPassiveSessions("", ""); len(pSS) != 1 {
t.Errorf("PassiveSessions: %+v", pSS)
}
smg.passiveSessions[smGev2.GetUUID()] = []*SMGSession{smgSession21}
smgSession21 := &SMGSession{CGRID: smGev2.GetCGRID(utils.META_DEFAULT), EventStart: smGev2, RunID: utils.META_DEFAULT}
smg.passiveSessions[smgSession21.CGRID] = []*SMGSession{smgSession21}
if pSS := smg.getPassiveSessions("", ""); len(pSS) != 2 {
t.Errorf("PassiveSessions: %+v", pSS)
}
if pSS := smg.getPassiveSessions(smGev1.GetUUID(), ""); len(pSS) != 1 || len(pSS[smGev1.GetUUID()]) != 2 {
if pSS := smg.getPassiveSessions(smgSession11.CGRID, ""); len(pSS) != 1 || len(pSS[smgSession11.CGRID]) != 2 {
t.Errorf("PassiveSessions: %+v", pSS)
}
if pSS := smg.getPassiveSessions(smGev1.GetUUID(), smgSession12.RunID); len(pSS) != 1 || len(pSS[smGev1.GetUUID()]) != 1 {
if pSS := smg.getPassiveSessions(smgSession11.CGRID, smgSession12.RunID); len(pSS) != 1 || len(pSS[smgSession11.CGRID]) != 1 {
t.Errorf("PassiveSessions: %+v", pSS)
}
if pSS := smg.getPassiveSessions("aabbcc", ""); len(pSS) != 0 {

View File

@@ -132,14 +132,15 @@ func TestSMGRplcInitiate(t *testing.T) {
t.Error("Bad max usage: ", maxUsage)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated
cgrID := smgEv.GetCGRID(utils.META_DEFAULT)
if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err != nil {
t.Error(err)
} else if len(pSessions) != 1 {
t.Errorf("PassiveSessions: %+v", pSessions)
} else if _, hasOriginID := pSessions[smgEv.GetUUID()]; !hasOriginID {
} else if _, hasOriginID := pSessions[cgrID]; !hasOriginID {
t.Errorf("PassiveSessions: %+v", pSessions)
} else if pSessions[smgEv.GetUUID()][0].TotalUsage != time.Duration(90*time.Second) {
t.Errorf("PassiveSession: %+v", pSessions[smgEv.GetUUID()][0])
} else if pSessions[cgrID][0].TotalUsage != time.Duration(90*time.Second) {
t.Errorf("PassiveSession: %+v", pSessions[cgrID][0])
}
}
@@ -160,15 +161,16 @@ func TestSMGRplcUpdate(t *testing.T) {
t.Error("Bad max usage: ", maxUsage)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated
cgrID := smgEv.GetCGRID(utils.META_DEFAULT)
var pSessions map[string][]*SMGSession
if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err != nil {
t.Error(err)
} else if len(pSessions) != 1 {
t.Errorf("PassiveSessions: %+v", pSessions)
} else if _, hasOriginID := pSessions[smgEv.GetUUID()]; !hasOriginID {
} else if _, hasOriginID := pSessions[cgrID]; !hasOriginID {
t.Errorf("PassiveSessions: %+v", pSessions)
} else if pSessions[smgEv.GetUUID()][0].TotalUsage != time.Duration(150*time.Second) {
t.Errorf("PassiveSession: %+v", pSessions[smgEv.GetUUID()][0])
} else if pSessions[cgrID][0].TotalUsage != time.Duration(150*time.Second) {
t.Errorf("PassiveSession: %+v", pSessions[cgrID][0])
}
}