From d253be03ac4e191aadebc6c5a60e9631753c5928 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 7 Nov 2016 17:22:54 +0100 Subject: [PATCH] SMGeneric using CGRID instead of OriginID for session indexing --- config/smconfig.go | 4 +- sessionmanager/session.go | 4 +- sessionmanager/smasterisk.go | 2 +- sessionmanager/smg_event.go | 30 ++-- sessionmanager/smg_event_test.go | 10 +- sessionmanager/smg_it_test.go | 9 +- sessionmanager/smg_session.go | 29 ++-- sessionmanager/smgeneric.go | 241 +++++++++++++++-------------- sessionmanager/smgeneric_test.go | 85 ++++++---- sessionmanager/smgreplc_it_test.go | 14 +- 10 files changed, 236 insertions(+), 192 deletions(-) diff --git a/config/smconfig.go b/config/smconfig.go index 7844653a3..21e874cee 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -99,7 +99,7 @@ type SmGenericConfig struct { SessionTTL time.Duration SessionTTLLastUsed *time.Duration SessionTTLUsage *time.Duration - SessionIndexes []string + SessionIndexes utils.StringMap } func (self *SmGenericConfig) loadFromJsonCfg(jsnCfg *SmGenericJsonCfg) error { @@ -162,7 +162,7 @@ func (self *SmGenericConfig) loadFromJsonCfg(jsnCfg *SmGenericJsonCfg) error { } } if jsnCfg.Session_indexes != nil { - self.SessionIndexes = *jsnCfg.Session_indexes + self.SessionIndexes = utils.StringMapFromSlice(*jsnCfg.Session_indexes) } return nil } diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 04ff8abf1..bf1257b03 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -279,7 +279,7 @@ func (s *Session) AsActiveSessions() []*ActiveSession { pdd, _ := s.eventStart.GetPdd(utils.META_DEFAULT) for _, sessionRun := range s.sessionRuns { aSession := &ActiveSession{ - CgrId: s.eventStart.GetCgrId(s.sessionManager.Timezone()), + CGRID: s.eventStart.GetCgrId(s.sessionManager.Timezone()), TOR: utils.VOICE, OriginID: s.eventStart.GetUUID(), CdrHost: s.eventStart.GetOriginatorIP(utils.META_DEFAULT), @@ -325,7 +325,7 @@ func (s *Session) AsMapStringIface() (map[string]interface{}, error) { // Will be used when displaying active sessions via RPC type ActiveSession struct { - CgrId string + CGRID string TOR string // type of record, meta-field, should map to one of the TORs hardcoded inside the server <*voice|*data|*sms|*generic> OriginID string // represents the unique accounting id given by the telecom switch generating the CDR CdrHost string // represents the IP address of the host generating the CDR (automatically populated by the server) diff --git a/sessionmanager/smasterisk.go b/sessionmanager/smasterisk.go index 87afaf2e8..aa1a34678 100644 --- a/sessionmanager/smasterisk.go +++ b/sessionmanager/smasterisk.go @@ -230,7 +230,7 @@ func (sma *SMAsterisk) ServiceShutdown() error { // Internal method to disconnect session in asterisk func (sma *SMAsterisk) V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) error { - channelID := SMGenericEvent(args.EventStart).GetUUID() + channelID := SMGenericEvent(args.EventStart).GetOriginID(utils.META_DEFAULT) if err := sma.hangupChannel(channelID); err != nil { utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", err.Error(), channelID)) } diff --git a/sessionmanager/smg_event.go b/sessionmanager/smg_event.go index 20e909fbf..a429aed32 100644 --- a/sessionmanager/smg_event.go +++ b/sessionmanager/smg_event.go @@ -35,6 +35,11 @@ var ( type SMGenericEvent map[string]interface{} +func (ev SMGenericEvent) HasField(fieldName string) (hasField bool) { + _, hasField = ev[fieldName] + return +} + func (self SMGenericEvent) GetName() string { result, _ := utils.ConvertIfaceToString(self[utils.EVENT_NAME]) return result @@ -48,19 +53,22 @@ func (self SMGenericEvent) GetTOR(fieldName string) string { return result } -func (self SMGenericEvent) GetCgrId(timezone string) string { - //setupTime, _ := self.GetSetupTime(utils.META_DEFAULT, timezone) - //return utils.Sha1(self.GetUUID(), setupTime.UTC().String()) - return utils.Sha1(self.GetUUID()) +func (self SMGenericEvent) GetCGRID(oIDFieldName string) string { + return utils.Sha1(self.GetOriginID(oIDFieldName), self.GetOriginatorIP(utils.META_DEFAULT)) } -func (self SMGenericEvent) GetUUID() string { - result, _ := utils.ConvertIfaceToString(self[utils.ACCID]) +// GetOriginID returns the OriginID from event +// fieldName offers the possibility to extract info from other fields, eg: InitialOriginID +func (self SMGenericEvent) GetOriginID(fieldName string) string { + if fieldName == utils.META_DEFAULT { + fieldName = utils.ACCID + } + result, _ := utils.ConvertIfaceToString(self[fieldName]) return result } func (self SMGenericEvent) GetSessionIds() []string { - return []string{self.GetUUID()} + return []string{self.GetOriginID(utils.META_DEFAULT)} } func (self SMGenericEvent) GetDirection(fieldName string) string { @@ -321,11 +329,11 @@ func (self SMGenericEvent) MissingParameter(timezone string) bool { func (self SMGenericEvent) ParseEventValue(rsrFld *utils.RSRField, timezone string) string { switch rsrFld.Id { case utils.CGRID: - return rsrFld.ParseValue(self.GetCgrId(timezone)) + return rsrFld.ParseValue(self.GetCGRID(utils.META_DEFAULT)) case utils.TOR: return rsrFld.ParseValue(utils.VOICE) case utils.ACCID: - return rsrFld.ParseValue(self.GetUUID()) + return rsrFld.ParseValue(self.GetOriginID(utils.META_DEFAULT)) case utils.CDRHOST: return rsrFld.ParseValue(self.GetOriginatorIP(utils.META_DEFAULT)) case utils.CDRSOURCE: @@ -378,9 +386,9 @@ func (self SMGenericEvent) PassesFieldFilter(*utils.RSRField) (bool, string) { func (self SMGenericEvent) AsStoredCdr(cfg *config.CGRConfig, timezone string) *engine.CDR { storCdr := engine.NewCDRWithDefaults(cfg) - storCdr.CGRID = self.GetCgrId(timezone) + storCdr.CGRID = self.GetCGRID(utils.META_DEFAULT) storCdr.ToR = utils.FirstNonEmpty(self.GetTOR(utils.META_DEFAULT), storCdr.ToR) // Keep default if none in the event - storCdr.OriginID = self.GetUUID() + storCdr.OriginID = self.GetOriginID(utils.META_DEFAULT) storCdr.OriginHost = self.GetOriginatorIP(utils.META_DEFAULT) storCdr.Source = self.GetCdrSource() storCdr.RequestType = utils.FirstNonEmpty(self.GetReqType(utils.META_DEFAULT), storCdr.RequestType) diff --git a/sessionmanager/smg_event_test.go b/sessionmanager/smg_event_test.go index bf90efa00..8f960f383 100644 --- a/sessionmanager/smg_event_test.go +++ b/sessionmanager/smg_event_test.go @@ -54,11 +54,11 @@ func TestSMGenericEventParseFields(t *testing.T) { if smGev.GetName() != "TEST_EVENT" { t.Error("Unexpected: ", smGev.GetName()) } - if smGev.GetCgrId("UTC") != "8cb2237d0679ca88db6464eac60da96345513964" { - t.Error("Unexpected: ", smGev.GetCgrId("UTC")) + if smGev.GetCGRID(utils.META_DEFAULT) != "cade401f46f046311ed7f62df3dfbb84adb98aad" { + t.Error("Unexpected: ", smGev.GetCGRID(utils.META_DEFAULT)) } - if smGev.GetUUID() != "12345" { - t.Error("Unexpected: ", smGev.GetUUID()) + if smGev.GetOriginID(utils.META_DEFAULT) != "12345" { + t.Error("Unexpected: ", smGev.GetOriginID(utils.META_DEFAULT)) } if !reflect.DeepEqual(smGev.GetSessionIds(), []string{"12345"}) { t.Error("Unexpected: ", smGev.GetSessionIds()) @@ -152,7 +152,7 @@ func TestSMGenericEventAsStoredCdr(t *testing.T) { smGev[utils.CDRHOST] = "10.0.3.15" smGev["Extra1"] = "Value1" smGev["Extra2"] = 5 - eStoredCdr := &engine.CDR{CGRID: "8cb2237d0679ca88db6464eac60da96345513964", + eStoredCdr := &engine.CDR{CGRID: "70c4d16dce41d1f2777b4e8442cff39cf87f5f19", ToR: utils.SMS, OriginID: "12345", OriginHost: "10.0.3.15", Source: "SMG_TEST_EVENT", RequestType: utils.META_PREPAID, Direction: utils.OUT, Tenant: "cgrates.org", Category: "call", Account: "account1", Subject: "subject1", Destination: "+4986517174963", SetupTime: time.Date(2015, 11, 9, 14, 21, 24, 0, time.UTC), AnswerTime: time.Date(2015, 11, 9, 14, 22, 2, 0, time.UTC), diff --git a/sessionmanager/smg_it_test.go b/sessionmanager/smg_it_test.go index 57bfafca6..31f92e7d6 100644 --- a/sessionmanager/smg_it_test.go +++ b/sessionmanager/smg_it_test.go @@ -757,7 +757,8 @@ func TestSMGVoiceSessionTTLWithRelocate(t *testing.T) { t.Errorf("Expecting: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue()) } var aSessions []*ActiveSession - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT), OriginID: utils.StringPointer(smgEv.GetUUID())}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT), + OriginID: utils.StringPointer(smgEv.GetOriginID(utils.META_DEFAULT))}, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 { t.Errorf("Unexpected number of sessions received: %+v", aSessions) @@ -791,7 +792,8 @@ func TestSMGVoiceSessionTTLWithRelocate(t *testing.T) { } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != eAcntVal { t.Errorf("Expecting: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT), OriginID: utils.StringPointer(smgEv.GetUUID())}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT), + OriginID: utils.StringPointer(smgEv.GetOriginID(utils.META_DEFAULT))}, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 { t.Errorf("Unexpected number of sessions received: %+v", aSessions) @@ -805,7 +807,8 @@ func TestSMGVoiceSessionTTLWithRelocate(t *testing.T) { } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != eAcntVal { t.Errorf("Expecting: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT), OriginID: utils.StringPointer(smgEv.GetUUID())}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT), + OriginID: utils.StringPointer(smgEv.GetOriginID(utils.META_DEFAULT))}, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 0 { t.Errorf("Unexpected number of sessions received: %+v", aSessions) diff --git a/sessionmanager/smg_session.go b/sessionmanager/smg_session.go index e7a101a4a..c15618d70 100644 --- a/sessionmanager/smg_session.go +++ b/sessionmanager/smg_session.go @@ -30,6 +30,7 @@ import ( // One session handled by SM type SMGSession struct { + CGRID string // Unique identifier for this session EventStart SMGenericEvent // Event which started stopDebit chan struct{} // Channel to communicate with debit loops when closing the session RunID string // Keep a reference for the derived run @@ -42,7 +43,7 @@ type SMGSession struct { LastDebit time.Duration // last real debited duration TotalUsage time.Duration // sum of lastUsage clntConn rpcclient.RpcClientConnection // Reference towards client connection on SMG side so we can disconnect. - rater rpcclient.RpcClientConnection // Connector to Rater service + rals rpcclient.RpcClientConnection // Connector to rals service cdrsrv rpcclient.RpcClientConnection // Connector to CDRS service } @@ -56,19 +57,19 @@ func (self *SMGSession) debitLoop(debitInterval time.Duration) { return case <-time.After(sleepDur): if maxDebit, err := self.debit(debitInterval, nil); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not complete debit operation on session: %s, error: %s", self.EventStart.GetUUID(), err.Error())) + utils.Logger.Err(fmt.Sprintf(" Could not complete debit operation on session: %s, error: %s", self.CGRID, err.Error())) disconnectReason := SYSTEM_ERROR if err.Error() == utils.ErrUnauthorizedDestination.Error() { disconnectReason = err.Error() } if err := self.disconnectSession(disconnectReason); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not disconnect session: %s, error: %s", self.EventStart.GetUUID(), err.Error())) + utils.Logger.Err(fmt.Sprintf(" Could not disconnect session: %s, error: %s", self.CGRID, err.Error())) } return } else if maxDebit < debitInterval { time.Sleep(maxDebit) if err := self.disconnectSession(INSUFFICIENT_FUNDS); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not disconnect session: %s, error: %s", self.EventStart.GetUUID(), err.Error())) + utils.Logger.Err(fmt.Sprintf(" Could not disconnect session: %s, error: %s", self.CGRID, err.Error())) } return } @@ -108,7 +109,7 @@ func (self *SMGSession) debit(dur time.Duration, lastUsed *time.Duration) (time. self.CD.TimeEnd = self.CD.TimeStart.Add(dur) self.CD.DurationIndex += dur cc := &engine.CallCost{} - if err := self.rater.Call("Responder.MaxDebit", self.CD, cc); err != nil { + if err := self.rals.Call("Responder.MaxDebit", self.CD, cc); err != nil { self.LastUsage = 0 self.LastDebit = 0 return 0, err @@ -189,7 +190,7 @@ func (self *SMGSession) refund(refundDuration time.Duration) error { cd.RunID = self.CD.RunID cd.Increments.Compress() var response float64 - err := self.rater.Call("Responder.RefundIncrements", cd, &response) + err := self.rals.Call("Responder.RefundIncrements", cd, &response) if err != nil { return err } @@ -231,7 +232,7 @@ func (self *SMGSession) disconnectSession(reason string) error { // Merge the sum of costs and sends it to CDRS for storage // originID could have been changed from original event, hence passing as argument here // pass cc as the clone of original to avoid concurrency issues -func (self *SMGSession) saveOperations(originID string) error { +func (self *SMGSession) saveOperations(cgrID string) error { if len(self.CallCosts) == 0 { return nil // There are no costs to save, ignore the operation } @@ -244,26 +245,26 @@ func (self *SMGSession) saveOperations(originID string) error { cd.RunID = self.CD.RunID cd.Increments = roundIncrements var response float64 - if err := self.rater.Call("Responder.RefundRounding", cd, &response); err != nil { + if err := self.rals.Call("Responder.RefundRounding", cd, &response); err != nil { return err } } smCost := &engine.SMCost{ - CGRID: self.EventStart.GetCgrId(self.Timezone), + CGRID: self.CGRID, CostSource: utils.SESSION_MANAGER_SOURCE, RunID: self.RunID, OriginHost: self.EventStart.GetOriginatorIP(utils.META_DEFAULT), - OriginID: originID, + OriginID: self.EventStart.GetOriginID(utils.META_DEFAULT), Usage: self.TotalUsage.Seconds(), CostDetails: cc, } if len(smCost.CostDetails.Timespans) > MaxTimespansInCost { // Merge since we will get a callCost too big if err := utils.Clone(cc, &smCost.CostDetails); err != nil { // Avoid concurrency on CC - utils.Logger.Err(fmt.Sprintf(" Could not clone callcost for sessionID: %s, RunID: %s, error: %s", originID, self.RunID, err.Error())) + utils.Logger.Err(fmt.Sprintf(" Could not clone callcost for sessionID: %s, RunID: %s, error: %s", cgrID, self.RunID, err.Error())) } go func(smCost *engine.SMCost) { // could take longer than the locked stage if err := self.storeSMCost(smCost); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not store callcost for sessionID: %s, RunID: %s, error: %s", originID, self.RunID, err.Error())) + utils.Logger.Err(fmt.Sprintf(" Could not store callcost for sessionID: %s, RunID: %s, error: %s", cgrID, self.RunID, err.Error())) } }(smCost) } else { @@ -294,10 +295,10 @@ func (self *SMGSession) AsActiveSession(timezone string) *ActiveSession { aTime, _ := self.EventStart.GetAnswerTime(utils.META_DEFAULT, timezone) pdd, _ := self.EventStart.GetPdd(utils.META_DEFAULT) aSession := &ActiveSession{ - CgrId: self.EventStart.GetCgrId(timezone), + CGRID: self.CGRID, TOR: self.EventStart.GetTOR(utils.META_DEFAULT), RunID: self.RunID, - OriginID: self.EventStart.GetUUID(), + OriginID: self.EventStart.GetOriginID(utils.META_DEFAULT), CdrHost: self.EventStart.GetOriginatorIP(utils.META_DEFAULT), CdrSource: self.EventStart.GetCdrSource(), ReqType: self.EventStart.GetReqType(utils.META_DEFAULT), diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 9e3e3092e..600f26195 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -44,14 +44,17 @@ type SMGReplicationConn struct { Synchronous bool } -func NewSMGeneric(cgrCfg *config.CGRConfig, rater rpcclient.RpcClientConnection, cdrsrv rpcclient.RpcClientConnection, +func NewSMGeneric(cgrCfg *config.CGRConfig, rals rpcclient.RpcClientConnection, cdrsrv rpcclient.RpcClientConnection, smgReplConns []*SMGReplicationConn, timezone string) *SMGeneric { + aSessIdxCfg := cgrCfg.SmGenericConfig.SessionIndexes + aSessIdxCfg[utils.ACCID] = true // Make sure we have indexing for OriginID since it is a requirement on prefix searching return &SMGeneric{cgrCfg: cgrCfg, - rater: rater, + rals: rals, cdrsrv: cdrsrv, smgReplConns: smgReplConns, timezone: timezone, activeSessions: make(map[string][]*SMGSession), + aSessionsIdxCfg: aSessIdxCfg, aSessionsIndex: make(map[string]map[string]utils.StringMap), passiveSessions: make(map[string][]*SMGSession), sessionTerminators: make(map[string]*smgSessionTerminator), @@ -60,13 +63,14 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rater rpcclient.RpcClientConnection, type SMGeneric struct { cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple - rater rpcclient.RpcClientConnection + rals rpcclient.RpcClientConnection cdrsrv rpcclient.RpcClientConnection smgReplConns []*SMGReplicationConn // list of connections where we will replicate our session data timezone string activeSessions map[string][]*SMGSession // group sessions per sessionId, multiple runs based on derived charging aSessionsMux sync.RWMutex - aSessionsIndex map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue]utils.StringMap[sesionID] + aSessionsIdxCfg utils.StringMap // index configuration + aSessionsIndex map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue]utils.StringMap[cgrID] aSIMux sync.RWMutex // protects aSessionsIndex passiveSessions map[string][]*SMGSession // group passive sessions pSessionsMux sync.RWMutex @@ -74,6 +78,7 @@ type SMGeneric struct { sTsMux sync.Mutex // protects sessionTerminators responseCache *cache.ResponseCache // cache replies here } + type smgSessionTerminator struct { timer *time.Timer endChan chan bool @@ -93,8 +98,7 @@ func (smg *SMGeneric) setSessionTerminator(s *SMGSession) { } smg.sTsMux.Lock() defer smg.sTsMux.Unlock() - uuid := s.EventStart.GetUUID() - if _, found := smg.sessionTerminators[uuid]; found { // already there, no need to set up + if _, found := smg.sessionTerminators[s.CGRID]; found { // already there, no need to set up return } timer := time.NewTimer(ttl) @@ -106,7 +110,7 @@ func (smg *SMGeneric) setSessionTerminator(s *SMGSession) { ttlLastUsed: s.EventStart.GetSessionTTLLastUsed(), ttlUsage: s.EventStart.GetSessionTTLUsage(), } - smg.sessionTerminators[uuid] = terminator + smg.sessionTerminators[s.CGRID] = terminator go func() { select { case <-timer.C: @@ -115,15 +119,15 @@ func (smg *SMGeneric) setSessionTerminator(s *SMGSession) { timer.Stop() } smg.sTsMux.Lock() - delete(smg.sessionTerminators, uuid) + delete(smg.sessionTerminators, s.CGRID) smg.sTsMux.Unlock() }() } // resetTerminatorTimer updates the timer for the session to a new ttl and terminate info -func (smg *SMGeneric) resetTerminatorTimer(uuid string, ttl time.Duration, ttlLastUsed, ttlUsage *time.Duration) { +func (smg *SMGeneric) resetTerminatorTimer(cgrID string, ttl time.Duration, ttlLastUsed, ttlUsage *time.Duration) { smg.aSessionsMux.RLock() - if st, found := smg.sessionTerminators[uuid]; found { + if st, found := smg.sessionTerminators[cgrID]; found { if ttl != 0 { st.ttl = ttl } @@ -144,56 +148,55 @@ func (smg *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) { if tmtr.ttlUsage != nil { debitUsage = *tmtr.ttlUsage } - aSessions := smg.getASession(s.EventStart.GetUUID()) + aSessions := smg.getASession(s.CGRID) if len(aSessions) == 0 { // will not continue if the session is not longer active return } for _, s := range aSessions { s.debit(debitUsage, tmtr.ttlLastUsed) } - smg.sessionEnd(s.EventStart.GetUUID(), s.TotalUsage) + smg.sessionEnd(s.CGRID, s.TotalUsage) cdr := s.EventStart.AsStoredCdr(smg.cgrCfg, smg.timezone) cdr.Usage = s.TotalUsage var reply string smg.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply) - smg.replicateSessions(s.EventStart.GetUUID()) + smg.replicateSessions(s.CGRID) } -func (smg *SMGeneric) recordASession(uuid string, s *SMGSession) { +func (smg *SMGeneric) recordASession(s *SMGSession) { smg.aSessionsMux.Lock() - smg.activeSessions[uuid] = append(smg.activeSessions[uuid], s) + smg.activeSessions[s.CGRID] = append(smg.activeSessions[s.CGRID], s) smg.setSessionTerminator(s) - smg.indexASession(uuid, s) + smg.indexASession(s) smg.aSessionsMux.Unlock() } // Remove session from session list, removes all related in case of multiple runs, true if item was found -func (smg *SMGeneric) unrecordASession(uuid string) bool { +func (smg *SMGeneric) unrecordASession(cgrID string) bool { smg.aSessionsMux.Lock() defer smg.aSessionsMux.Unlock() - if _, found := smg.activeSessions[uuid]; !found { + if _, found := smg.activeSessions[cgrID]; !found { return false } - delete(smg.activeSessions, uuid) - if st, found := smg.sessionTerminators[uuid]; found { + delete(smg.activeSessions, cgrID) + if st, found := smg.sessionTerminators[cgrID]; found { st.endChan <- true } - smg.unindexASession(uuid) + smg.unindexASession(cgrID) return true } // indexASession explores settings and builds smg.aSessionsIndex based on that -func (smg *SMGeneric) indexASession(uuid string, s *SMGSession) bool { +func (smg *SMGeneric) indexASession(s *SMGSession) bool { smg.aSIMux.Lock() defer smg.aSIMux.Unlock() - ev := s.EventStart - for _, fieldName := range smg.cgrCfg.SmGenericConfig.SessionIndexes { - fieldVal, err := utils.ReflectFieldAsString(ev, fieldName, "") + for fieldName := range smg.aSessionsIdxCfg { + fieldVal, err := utils.ReflectFieldAsString(s.EventStart, fieldName, "") if err != nil { if err == utils.ErrNotFound { fieldVal = utils.NOT_AVAILABLE } else { - utils.Logger.Err(fmt.Sprintf(" Error retrieving field: %s from event: %+v", fieldName, ev)) + utils.Logger.Err(fmt.Sprintf(" Error retrieving field: %s from event: %+v", fieldName, s.EventStart)) continue } } @@ -206,21 +209,20 @@ func (smg *SMGeneric) indexASession(uuid string, s *SMGSession) bool { if _, hasFieldVal := smg.aSessionsIndex[fieldName][fieldVal]; !hasFieldVal { smg.aSessionsIndex[fieldName][fieldVal] = make(utils.StringMap) } - smg.aSessionsIndex[fieldName][fieldVal][uuid] = true + smg.aSessionsIndex[fieldName][fieldVal][s.CGRID] = true } return true } // unindexASession removes a session from indexes -func (smg *SMGeneric) unindexASession(uuid string) bool { +func (smg *SMGeneric) unindexASession(cgrID string) (found bool) { smg.aSIMux.Lock() defer smg.aSIMux.Unlock() - var found bool for fldName := range smg.aSessionsIndex { for fldVal := range smg.aSessionsIndex[fldName] { - if _, hasUUID := smg.aSessionsIndex[fldName][fldVal][uuid]; hasUUID { + if _, hasCGRID := smg.aSessionsIndex[fldName][fldVal][cgrID]; hasCGRID { found = true - delete(smg.aSessionsIndex[fldName][fldVal], uuid) + delete(smg.aSessionsIndex[fldName][fldVal], cgrID) if len(smg.aSessionsIndex[fldName][fldVal]) == 0 { delete(smg.aSessionsIndex[fldName], fldVal) } @@ -230,7 +232,7 @@ func (smg *SMGeneric) unindexASession(uuid string) bool { } } } - return found + return } // getSessionIDsMatchingIndexes will check inside indexes if it can find sessionIDs matching all filters @@ -257,50 +259,49 @@ func (smg *SMGeneric) getSessionIDsMatchingIndexes(fltrs map[string]string) (uti continue } // Higher run, takes out non matching indexes - for sessID := range sessionIDxes[fltrName][fltrVal] { - if _, hasUUID := matchingSessions[sessID]; !hasUUID { - delete(matchingSessions, sessID) + for cgrID := range sessionIDxes[fltrName][fltrVal] { + if _, hasCGRID := matchingSessions[cgrID]; !hasCGRID { + delete(matchingSessions, cgrID) } } } return matchingSessions.Clone(), matchedIndexes } -// getSessionIDsForPrefix works with session relocation returning list of sessions with ID matching prefix -func (smg *SMGeneric) getSessionIDsForPrefix(prefix string) []string { +// getSessionIDsForPrefix works with session relocation returning list of sessions with ID matching prefix for OriginID field +func (smg *SMGeneric) getSessionIDsForPrefix(prefix string) (cgrIDs []string) { smg.aSessionsMux.Lock() defer smg.aSessionsMux.Unlock() - sessionIDs := make([]string, 0) - for sessionID := range smg.activeSessions { - if strings.HasPrefix(sessionID, prefix) { - sessionIDs = append(sessionIDs, sessionID) + for originID := range smg.aSessionsIndex[utils.ACCID] { + if strings.HasPrefix(originID, prefix) { + cgrIDs = append(cgrIDs, smg.aSessionsIndex[utils.ACCID][originID].Slice()...) } } - return sessionIDs + return } // Returns sessions/derived for a specific uuid -func (smg *SMGeneric) getASession(uuid string) []*SMGSession { +func (smg *SMGeneric) getASession(cgrID string) []*SMGSession { smg.aSessionsMux.RLock() defer smg.aSessionsMux.RUnlock() - return smg.activeSessions[uuid] + return smg.activeSessions[cgrID] } // sessionStart will handle a new session, pass the connectionId so we can communicate on disconnect request func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.RpcClientConnection) error { - sessionId := evStart.GetUUID() - processed, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on UUID level + cgrID := evStart.GetCGRID(utils.META_DEFAULT) + processed, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on CGRID level var sessionRuns []*engine.SessionRun - if err := smg.rater.Call("Responder.GetSessionRuns", evStart.AsStoredCdr(smg.cgrCfg, smg.timezone), &sessionRuns); err != nil { + if err := smg.rals.Call("Responder.GetSessionRuns", evStart.AsStoredCdr(smg.cgrCfg, smg.timezone), &sessionRuns); err != nil { return true, err } else if len(sessionRuns) == 0 { return true, nil } stopDebitChan := make(chan struct{}) for _, sessionRun := range sessionRuns { - s := &SMGSession{EventStart: evStart, RunID: sessionRun.DerivedCharger.RunID, Timezone: smg.timezone, - rater: smg.rater, cdrsrv: smg.cdrsrv, CD: sessionRun.CallDescriptor, clntConn: clntConn} - smg.recordASession(sessionId, s) + s := &SMGSession{CGRID: cgrID, EventStart: evStart, RunID: sessionRun.DerivedCharger.RunID, Timezone: smg.timezone, + rals: smg.rals, cdrsrv: smg.cdrsrv, CD: sessionRun.CallDescriptor, clntConn: clntConn} + smg.recordASession(s) //utils.Logger.Info(fmt.Sprintf(" Starting session: %s, runId: %s", sessionId, s.runId)) if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { s.stopDebit = stopDebitChan @@ -308,7 +309,7 @@ func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.Rp } } return true, nil - }, smg.cgrCfg.LockingTimeout, sessionId) + }, smg.cgrCfg.LockingTimeout, cgrID) if processed == nil || processed == false { utils.Logger.Err(" Cannot start session, empty reply") return utils.ErrServerError @@ -317,13 +318,13 @@ func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.Rp } // sessionEnd will end a session from outside -func (smg *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error { +func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) error { _, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on UUID level - ss := smg.getASession(sessionId) + ss := smg.getASession(cgrID) if len(ss) == 0 { // Not handled by us return nil, nil } - if !smg.unrecordASession(sessionId) { // Unreference it early so we avoid concurrency + if !smg.unrecordASession(cgrID) { // Unreference it early so we avoid concurrency return nil, nil // Did not find the session so no need to close it anymore } for idx, s := range ss { @@ -331,32 +332,32 @@ func (smg *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error { if idx == 0 && s.stopDebit != nil { close(s.stopDebit) // Stop automatic debits } - aTime, err := s.EventStart.GetAnswerTime(utils.META_DEFAULT, smg.cgrCfg.DefaultTimezone) + aTime, err := s.EventStart.GetAnswerTime(utils.META_DEFAULT, smg.timezone) if err != nil || aTime.IsZero() { utils.Logger.Err(fmt.Sprintf(" Could not retrieve answer time for session: %s, runId: %s, aTime: %+v, error: %v", - sessionId, s.RunID, aTime, err)) + cgrID, s.RunID, aTime, err)) continue // Unanswered session } if err := s.close(aTime.Add(usage)); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not close session: %s, runId: %s, error: %s", sessionId, s.RunID, err.Error())) + utils.Logger.Err(fmt.Sprintf(" Could not close session: %s, runId: %s, error: %s", cgrID, s.RunID, err.Error())) } - if err := s.saveOperations(sessionId); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not save session: %s, runId: %s, error: %s", sessionId, s.RunID, err.Error())) + if err := s.saveOperations(cgrID); err != nil { + utils.Logger.Err(fmt.Sprintf(" Could not save session: %s, runId: %s, error: %s", cgrID, s.RunID, err.Error())) } } return nil, nil - }, time.Duration(2)*time.Second, sessionId) + }, time.Duration(2)*time.Second, cgrID) return err } // sessionRelocate is used when an update will relocate an initial session (eg multiple data streams) -func (smg *SMGeneric) sessionRelocate(sessionID, initialID string) error { +func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) error { _, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on initialID level - if utils.IsSliceMember([]string{sessionID, initialID}, "") { // Not allowed empty params here + if utils.IsSliceMember([]string{initialID, cgrID, newOriginID}, "") { // Not allowed empty params here return nil, utils.ErrMandatoryIeMissing } - ssNew := smg.getASession(sessionID) // Already relocated - if len(ssNew) != 0 { + ssNew := smg.getASession(cgrID) + if len(ssNew) != 0 { // Already relocated return nil, nil } ss := smg.getASession(initialID) @@ -364,8 +365,9 @@ func (smg *SMGeneric) sessionRelocate(sessionID, initialID string) error { return nil, utils.ErrNotFound } for i, s := range ss { - s.EventStart[utils.ACCID] = sessionID // Overwrite initialSessionID with new one - smg.recordASession(sessionID, s) + s.CGRID = cgrID // Overwrite initial CGRID with new one + s.EventStart[utils.ACCID] = newOriginID // Overwrite OriginID for session indexing + smg.recordASession(s) if i == 0 { smg.unrecordASession(initialID) } @@ -376,12 +378,12 @@ func (smg *SMGeneric) sessionRelocate(sessionID, initialID string) error { } // replicateSessions will replicate session based on configuration -func (smg *SMGeneric) replicateSessions(originID string) (err error) { +func (smg *SMGeneric) replicateSessions(cgrID string) (err error) { if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { return } smg.aSessionsMux.RLock() - aSessions := smg.activeSessions[originID] + aSessions := smg.activeSessions[cgrID] smg.aSessionsMux.RUnlock() var wg sync.WaitGroup for _, rplConn := range smg.smgReplConns { @@ -390,7 +392,7 @@ func (smg *SMGeneric) replicateSessions(originID string) (err error) { } go func(conn rpcclient.RpcClientConnection, sync bool, ss []*SMGSession) { var reply string - argSet := ArgsSetPassiveSessions{OriginID: originID, Sessions: ss} + argSet := ArgsSetPassiveSessions{CGRID: cgrID, Sessions: ss} conn.Call("SMGenericV1.SetPassiveSessions", argSet, &reply) if sync { wg.Done() @@ -402,12 +404,12 @@ func (smg *SMGeneric) replicateSessions(originID string) (err error) { } // sessionActiveToPassive is a mechanism to transit a session from active to passive state -func (smg *SMGeneric) sessionActiveToPassive(originID string) (err error) { +func (smg *SMGeneric) sessionActiveToPassive(cgrID string) (err error) { return } // sessionPassiveToActive is a mechanism to transit a session from passive to active state -func (smg *SMGeneric) sessionPassiveToActive(originID string) (err error) { +func (smg *SMGeneric) sessionPassiveToActive(cgrID string) (err error) { return } @@ -415,7 +417,7 @@ func (smg *SMGeneric) sessionPassiveToActive(originID string) (err error) { // MaxUsage calculates maximum usage allowed for given gevent func (smg *SMGeneric) MaxUsage(gev SMGenericEvent) (maxUsage time.Duration, err error) { - cacheKey := "MaxUsage" + gev.GetCgrId(smg.timezone) + cacheKey := "MaxUsage" + gev.GetCGRID(utils.META_DEFAULT) if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { return (item.Value.(time.Duration)), item.Err } @@ -423,7 +425,7 @@ func (smg *SMGeneric) MaxUsage(gev SMGenericEvent) (maxUsage time.Duration, err gev[utils.EVENT_NAME] = utils.CGR_AUTHORIZATION storedCdr := gev.AsStoredCdr(config.CgrConfig(), smg.timezone) var maxDur float64 - if err = smg.rater.Call("Responder.GetDerivedMaxSessionTime", storedCdr, &maxDur); err != nil { + if err = smg.rals.Call("Responder.GetDerivedMaxSessionTime", storedCdr, &maxDur); err != nil { return } maxUsage = time.Duration(maxDur) @@ -431,7 +433,7 @@ func (smg *SMGeneric) MaxUsage(gev SMGenericEvent) (maxUsage time.Duration, err } func (smg *SMGeneric) LCRSuppliers(gev SMGenericEvent) (suppls []string, err error) { - cacheKey := "LCRSuppliers" + gev.GetCgrId(smg.timezone) + gev.GetAccount(utils.META_DEFAULT) + gev.GetDestination(utils.META_DEFAULT) + cacheKey := "LCRSuppliers" + gev.GetCGRID(utils.META_DEFAULT) + gev.GetAccount(utils.META_DEFAULT) + gev.GetDestination(utils.META_DEFAULT) if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { if item.Value != nil { suppls = (item.Value.([]string)) @@ -443,12 +445,12 @@ func (smg *SMGeneric) LCRSuppliers(gev SMGenericEvent) (suppls []string, err err gev[utils.EVENT_NAME] = utils.CGR_LCR_REQUEST var cd *engine.CallDescriptor cd, err = gev.AsLcrRequest().AsCallDescriptor(smg.timezone) - cd.CgrID = gev.GetCgrId(smg.timezone) + cd.CgrID = gev.GetCGRID(utils.META_DEFAULT) if err != nil { return } var lcr engine.LCRCost - if err = smg.rater.Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil { + if err = smg.rals.Call("Responder.GetLCR", &engine.AttrGetLcr{CallDescriptor: cd}, &lcr); err != nil { return } if lcr.HasErrors() { @@ -462,13 +464,14 @@ func (smg *SMGeneric) LCRSuppliers(gev SMGenericEvent) (suppls []string, err err // Called on session start func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (maxUsage time.Duration, err error) { - cacheKey := "InitiateSession" + gev.GetCgrId(smg.timezone) + cgrID := gev.GetCGRID(utils.META_DEFAULT) + cacheKey := "InitiateSession" + cgrID if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { return item.Value.(time.Duration), item.Err } defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: maxUsage, Err: err}) // schedule response caching if err = smg.sessionStart(gev, clnt); err != nil { - smg.sessionEnd(gev.GetUUID(), 0) + smg.sessionEnd(cgrID, 0) return } if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Session handled by debit loop @@ -477,14 +480,15 @@ func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, clnt rpcclient.RpcClie } maxUsage, err = smg.UpdateSession(gev, clnt) if err != nil || maxUsage == 0 { - smg.sessionEnd(gev.GetUUID(), 0) + smg.sessionEnd(cgrID, 0) } return } // Execute debits for usage/maxUsage func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (maxUsage time.Duration, err error) { - cacheKey := "UpdateSession" + gev.GetCgrId(smg.timezone) + cgrID := gev.GetCGRID(utils.META_DEFAULT) + cacheKey := "UpdateSession" + cgrID if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { return item.Value.(time.Duration), item.Err } @@ -493,10 +497,11 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient err = errors.New("ACTIVE_DEBIT_LOOP") return } - defer smg.replicateSessions(gev.GetUUID()) - if initialID, errGet := gev.GetFieldAsString(utils.InitialOriginID); errGet == nil { - defer smg.replicateSessions(initialID) - err = smg.sessionRelocate(gev.GetUUID(), initialID) + defer smg.replicateSessions(gev.GetCGRID(utils.META_DEFAULT)) + if gev.HasField(utils.InitialOriginID) { + initialCGRID := gev.GetCGRID(utils.InitialOriginID) + defer smg.replicateSessions(initialCGRID) + err = smg.sessionRelocate(initialCGRID, cgrID, gev.GetOriginID(utils.META_DEFAULT)) if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update err = smg.sessionStart(gev, clnt) } @@ -504,7 +509,7 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient return } } - smg.resetTerminatorTimer(gev.GetUUID(), gev.GetSessionTTL(), gev.GetSessionTTLLastUsed(), gev.GetSessionTTLUsage()) + smg.resetTerminatorTimer(cgrID, gev.GetSessionTTL(), gev.GetSessionTTLLastUsed(), gev.GetSessionTTLUsage()) var lastUsed *time.Duration var evLastUsed time.Duration if evLastUsed, err = gev.GetLastUsed(utils.META_DEFAULT); err == nil { @@ -518,9 +523,9 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient } return } - aSessions := smg.getASession(gev.GetUUID()) + aSessions := smg.getASession(cgrID) if len(aSessions) == 0 { - utils.Logger.Err(fmt.Sprintf(" SessionUpdate with no active sessions for event: <%s>", gev.GetUUID())) + utils.Logger.Err(fmt.Sprintf(" SessionUpdate with no active sessions for event: <%s>", cgrID)) err = utils.ErrServerError return } @@ -537,14 +542,16 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient // Called on session end, should stop debit loop func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (err error) { - cacheKey := "TerminateSession" + gev.GetCgrId(smg.timezone) + cgrID := gev.GetCGRID(utils.META_DEFAULT) + cacheKey := "TerminateSession" + cgrID if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { return item.Err } defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Err: err}) - if initialID, errGet := gev.GetFieldAsString(utils.InitialOriginID); errGet == nil { - err = smg.sessionRelocate(gev.GetUUID(), initialID) - defer smg.replicateSessions(initialID) + if gev.HasField(utils.InitialOriginID) { + initialCGRID := gev.GetCGRID(utils.InitialOriginID) + defer smg.replicateSessions(initialCGRID) + err = smg.sessionRelocate(initialCGRID, cgrID, gev.GetOriginID(utils.META_DEFAULT)) if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update err = smg.sessionStart(gev, clnt) } @@ -552,9 +559,11 @@ func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcCli return } } - sessionIDs := []string{gev.GetUUID()} - if sessionIDPrefix, errPrefix := gev.GetFieldAsString(utils.OriginIDPrefix); errPrefix == nil { // OriginIDPrefix is present, OriginID will not be anymore considered - sessionIDs = smg.getSessionIDsForPrefix(sessionIDPrefix) + sessionIDs := []string{cgrID} + if gev.HasField(utils.OriginIDPrefix) { // OriginIDPrefix is present, OriginID will not be anymore considered + if sessionIDPrefix, errPrefix := gev.GetFieldAsString(utils.OriginIDPrefix); errPrefix == nil { + sessionIDs = smg.getSessionIDsForPrefix(sessionIDPrefix) + } } usage, errUsage := gev.GetUsage(utils.META_DEFAULT) var lastUsed time.Duration @@ -598,13 +607,14 @@ func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcCli // Processes one time events (eg: SMS) func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxUsage time.Duration, err error) { - cacheKey := "ChargeEvent" + gev.GetCgrId(smg.timezone) + cgrID := gev.GetCGRID(utils.META_DEFAULT) + cacheKey := "ChargeEvent" + cgrID if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { return item.Value.(time.Duration), item.Err } defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: maxUsage, Err: err}) var sessionRuns []*engine.SessionRun - if err = smg.rater.Call("Responder.GetSessionRuns", gev.AsStoredCdr(smg.cgrCfg, smg.timezone), &sessionRuns); err != nil { + if err = smg.rals.Call("Responder.GetSessionRuns", gev.AsStoredCdr(smg.cgrCfg, smg.timezone), &sessionRuns); err != nil { return } else if len(sessionRuns) == 0 { return @@ -612,7 +622,7 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxUsage time.Duration, e var maxDurInit bool // Avoid differences between default 0 and received 0 for _, sR := range sessionRuns { cc := new(engine.CallCost) - if err = smg.rater.Call("Responder.MaxDebit", sR.CallDescriptor, cc); err != nil { + if err = smg.rals.Call("Responder.MaxDebit", sR.CallDescriptor, cc); err != nil { utils.Logger.Err(fmt.Sprintf(" Could not Debit CD: %+v, RunID: %s, error: %s", sR.CallDescriptor, sR.DerivedCharger.RunID, err.Error())) break } @@ -645,12 +655,12 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxUsage time.Duration, e if len(refundIncrements) > 0 { cd := cc.CreateCallDescriptor() cd.Increments = refundIncrements - cd.CgrID = sR.CallDescriptor.CgrID + cd.CgrID = cgrID cd.RunID = sR.CallDescriptor.RunID cd.Increments.Compress() //utils.Logger.Info(fmt.Sprintf("Refunding session run callcost: %s", utils.ToJSON(cd))) var response float64 - err = smg.rater.Call("Responder.RefundIncrements", cd, &response) + err = smg.rals.Call("Responder.RefundIncrements", cd, &response) if err != nil { return } @@ -675,17 +685,17 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxUsage time.Duration, e cd := cc.CreateCallDescriptor() cd.Increments = roundIncrements var response float64 - if errRefund := smg.rater.Call("Responder.RefundRounding", cd, &response); errRefund != nil { + if errRefund := smg.rals.Call("Responder.RefundRounding", cd, &response); errRefund != nil { utils.Logger.Err(fmt.Sprintf(" ERROR failed to refund rounding: %v", errRefund)) } } var reply string smCost := &engine.SMCost{ - CGRID: gev.GetCgrId(smg.timezone), + CGRID: cgrID, CostSource: utils.SESSION_MANAGER_SOURCE, RunID: sR.DerivedCharger.RunID, OriginHost: gev.GetOriginatorIP(utils.META_DEFAULT), - OriginID: gev.GetUUID(), + OriginID: gev.GetOriginID(utils.META_DEFAULT), CostDetails: cc, } if errStore := smg.cdrsrv.Call("CdrsV1.StoreSMCost", engine.AttrCDRSStoreSMCost{Cost: smCost, @@ -702,7 +712,8 @@ func (smg *SMGeneric) ChargeEvent(gev SMGenericEvent) (maxUsage time.Duration, e } func (smg *SMGeneric) ProcessCDR(gev SMGenericEvent) (err error) { - cacheKey := "ProcessCDR" + gev.GetCgrId(smg.timezone) + cgrID := gev.GetCGRID(utils.META_DEFAULT) + cacheKey := "ProcessCDR" + cgrID if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { return item.Err } @@ -778,14 +789,14 @@ func (smg *SMGeneric) ActiveSessions(fltrs map[string]string, count bool) (aSess return } -func (smg *SMGeneric) getPassiveSessions(originID, runID string) (pss map[string][]*SMGSession) { +func (smg *SMGeneric) getPassiveSessions(cgrID, runID string) (pss map[string][]*SMGSession) { smg.pSessionsMux.RLock() - if originID == "" { + if cgrID == "" { if len(smg.passiveSessions) != 0 { pss = smg.passiveSessions } } else { - pSSlc := smg.passiveSessions[originID] + pSSlc := smg.passiveSessions[cgrID] if runID != "" { var found bool for _, s := range pSSlc { @@ -799,7 +810,7 @@ func (smg *SMGeneric) getPassiveSessions(originID, runID string) (pss map[string } } if len(pSSlc) != 0 { - pss = map[string][]*SMGSession{originID: pSSlc} + pss = map[string][]*SMGSession{cgrID: pSSlc} } } smg.pSessionsMux.RUnlock() @@ -889,7 +900,7 @@ func (smg *SMGeneric) BiRPCV1InitiateSession(clnt rpcclient.RpcClientConnection, return nil } -// Interim updates, returns remaining duration from the rater +// Interim updates, returns remaining duration from the RALs func (smg *SMGeneric) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *float64) error { if minMaxUsage, err := smg.UpdateSession(ev, clnt); err != nil { return utils.NewErrServerError(err) @@ -946,7 +957,7 @@ func (smg *SMGeneric) BiRPCV1ActiveSessionsCount(attrs utils.AttrSMGGetActiveSes } type ArgsSetPassiveSessions struct { - OriginID string + CGRID string Sessions []*SMGSession } @@ -954,9 +965,9 @@ type ArgsSetPassiveSessions struct { func (smg *SMGeneric) BiRPCV1SetPassiveSessions(args ArgsSetPassiveSessions, reply *string) error { smg.pSessionsMux.Lock() if len(args.Sessions) == 0 { // Remove - delete(smg.passiveSessions, args.OriginID) + delete(smg.passiveSessions, args.CGRID) } else { // Set with overwrite - smg.passiveSessions[args.OriginID] = args.Sessions + smg.passiveSessions[args.CGRID] = args.Sessions } smg.pSessionsMux.Unlock() *reply = utils.OK @@ -964,15 +975,15 @@ func (smg *SMGeneric) BiRPCV1SetPassiveSessions(args ArgsSetPassiveSessions, rep } type ArgsGetPassiveSessions struct { - OriginID string - RunID string + CGRID string + RunID string } func (smg *SMGeneric) BiRPCV1GetPassiveSessions(attrs ArgsGetPassiveSessions, pSessions *map[string][]*SMGSession) error { - if attrs.RunID != "" && attrs.OriginID == "" { + if attrs.RunID != "" && attrs.CGRID == "" { return utils.ErrMandatoryIeMissing } - pSS := smg.getPassiveSessions(attrs.OriginID, attrs.RunID) + pSS := smg.getPassiveSessions(attrs.CGRID, attrs.RunID) if len(pSS) == 0 { return utils.ErrNotFound } diff --git a/sessionmanager/smgeneric_test.go b/sessionmanager/smgeneric_test.go index 7f6366b07..8bdd6fddf 100644 --- a/sessionmanager/smgeneric_test.go +++ b/sessionmanager/smgeneric_test.go @@ -29,7 +29,8 @@ var smgCfg *config.CGRConfig func init() { smgCfg, _ = config.NewDefaultCGRConfig() - smgCfg.SmGenericConfig.SessionIndexes = []string{"Tenant", "Account", "Extra3", "Extra4"} + smgCfg.SmGenericConfig.SessionIndexes = utils.StringMap{"Tenant": true, + "Account": true, "Extra3": true, "Extra4": true} } @@ -59,28 +60,33 @@ func TestSMGSessionIndexing(t *testing.T) { "Extra3": "", } // Index first session - smgSession := &SMGSession{EventStart: smGev} - uuid := smGev.GetUUID() - smg.indexASession(uuid, smgSession) + smgSession := &SMGSession{CGRID: smGev.GetCGRID(utils.META_DEFAULT), EventStart: smGev} + cgrID := smGev.GetCGRID(utils.META_DEFAULT) + smg.indexASession(smgSession) eIndexes := map[string]map[string]utils.StringMap{ + "OriginID": map[string]utils.StringMap{ + "12345": utils.StringMap{ + cgrID: true, + }, + }, "Tenant": map[string]utils.StringMap{ "cgrates.org": utils.StringMap{ - uuid: true, + cgrID: true, }, }, "Account": map[string]utils.StringMap{ "account1": utils.StringMap{ - uuid: true, + cgrID: true, }, }, "Extra3": map[string]utils.StringMap{ utils.MetaEmpty: utils.StringMap{ - uuid: true, + cgrID: true, }, }, "Extra4": map[string]utils.StringMap{ utils.NOT_AVAILABLE: utils.StringMap{ - uuid: true, + cgrID: true, }, }, } @@ -98,38 +104,46 @@ func TestSMGSessionIndexing(t *testing.T) { "Extra3": "", "Extra4": "info2", } - uuid2 := smGev2.GetUUID() - smgSession2 := &SMGSession{EventStart: smGev2} - smg.indexASession(uuid2, smgSession2) + cgrID2 := smGev2.GetCGRID(utils.META_DEFAULT) + smgSession2 := &SMGSession{CGRID: smGev2.GetCGRID(utils.META_DEFAULT), EventStart: smGev2} + smg.indexASession(smgSession2) eIndexes = map[string]map[string]utils.StringMap{ + "OriginID": map[string]utils.StringMap{ + "12345": utils.StringMap{ + cgrID: true, + }, + "12346": utils.StringMap{ + cgrID2: true, + }, + }, "Tenant": map[string]utils.StringMap{ "cgrates.org": utils.StringMap{ - uuid: true, + cgrID: true, }, "itsyscom.com": utils.StringMap{ - uuid2: true, + cgrID2: true, }, }, "Account": map[string]utils.StringMap{ "account1": utils.StringMap{ - uuid: true, + cgrID: true, }, "account2": utils.StringMap{ - uuid2: true, + cgrID2: true, }, }, "Extra3": map[string]utils.StringMap{ utils.MetaEmpty: utils.StringMap{ - uuid: true, - uuid2: true, + cgrID: true, + cgrID2: true, }, }, "Extra4": map[string]utils.StringMap{ utils.NOT_AVAILABLE: utils.StringMap{ - uuid: true, + cgrID: true, }, "info2": utils.StringMap{ - uuid2: true, + cgrID2: true, }, }, } @@ -137,26 +151,31 @@ func TestSMGSessionIndexing(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", eIndexes, smg.aSessionsIndex) } // Unidex first session - smg.unindexASession(uuid) + smg.unindexASession(cgrID) eIndexes = map[string]map[string]utils.StringMap{ + "OriginID": map[string]utils.StringMap{ + "12346": utils.StringMap{ + cgrID2: true, + }, + }, "Tenant": map[string]utils.StringMap{ "itsyscom.com": utils.StringMap{ - uuid2: true, + cgrID2: true, }, }, "Account": map[string]utils.StringMap{ "account2": utils.StringMap{ - uuid2: true, + cgrID2: true, }, }, "Extra3": map[string]utils.StringMap{ utils.MetaEmpty: utils.StringMap{ - uuid2: true, + cgrID2: true, }, }, "Extra4": map[string]utils.StringMap{ "info2": utils.StringMap{ - uuid2: true, + cgrID2: true, }, }, } @@ -190,7 +209,7 @@ func TestSMGActiveSessions(t *testing.T) { "Extra2": 5, "Extra3": "", } - smg.recordASession(smGev1.GetUUID(), &SMGSession{EventStart: smGev1}) + smg.recordASession(&SMGSession{CGRID: smGev1.GetCGRID(utils.META_DEFAULT), EventStart: smGev1}) smGev2 := SMGenericEvent{ utils.EVENT_NAME: "TEST_EVENT", utils.TOR: "*voice", @@ -211,7 +230,7 @@ func TestSMGActiveSessions(t *testing.T) { "Extra1": "Value1", "Extra3": "extra3", } - smg.recordASession(smGev2.GetUUID(), &SMGSession{EventStart: smGev2}) + smg.recordASession(&SMGSession{CGRID: smGev2.GetCGRID(utils.META_DEFAULT), EventStart: smGev2}) if aSessions, _, err := smg.ActiveSessions(nil, false); err != nil { t.Error(err) } else if len(aSessions) != 2 { @@ -268,8 +287,9 @@ func TestGetPassiveSessions(t *testing.T) { "Extra3": "", } // Index first session - smgSession11 := &SMGSession{EventStart: smGev1, RunID: utils.META_DEFAULT} - smgSession12 := &SMGSession{EventStart: smGev1, RunID: "second_run"} + smgSession11 := &SMGSession{CGRID: smGev1.GetCGRID(utils.META_DEFAULT), EventStart: smGev1, RunID: utils.META_DEFAULT} + smgSession12 := &SMGSession{CGRID: smGev1.GetCGRID(utils.META_DEFAULT), EventStart: smGev1, RunID: "second_run"} + smg.passiveSessions[smgSession11.CGRID] = []*SMGSession{smgSession11, smgSession12} smGev2 := SMGenericEvent{ utils.EVENT_NAME: "TEST_EVENT", utils.TOR: "*voice", @@ -293,19 +313,18 @@ func TestGetPassiveSessions(t *testing.T) { "Extra2": 5, "Extra3": "", } - smgSession21 := &SMGSession{EventStart: smGev2, RunID: utils.META_DEFAULT} - smg.passiveSessions[smGev1.GetUUID()] = []*SMGSession{smgSession11, smgSession12} if pSS := smg.getPassiveSessions("", ""); len(pSS) != 1 { t.Errorf("PassiveSessions: %+v", pSS) } - smg.passiveSessions[smGev2.GetUUID()] = []*SMGSession{smgSession21} + smgSession21 := &SMGSession{CGRID: smGev2.GetCGRID(utils.META_DEFAULT), EventStart: smGev2, RunID: utils.META_DEFAULT} + smg.passiveSessions[smgSession21.CGRID] = []*SMGSession{smgSession21} if pSS := smg.getPassiveSessions("", ""); len(pSS) != 2 { t.Errorf("PassiveSessions: %+v", pSS) } - if pSS := smg.getPassiveSessions(smGev1.GetUUID(), ""); len(pSS) != 1 || len(pSS[smGev1.GetUUID()]) != 2 { + if pSS := smg.getPassiveSessions(smgSession11.CGRID, ""); len(pSS) != 1 || len(pSS[smgSession11.CGRID]) != 2 { t.Errorf("PassiveSessions: %+v", pSS) } - if pSS := smg.getPassiveSessions(smGev1.GetUUID(), smgSession12.RunID); len(pSS) != 1 || len(pSS[smGev1.GetUUID()]) != 1 { + if pSS := smg.getPassiveSessions(smgSession11.CGRID, smgSession12.RunID); len(pSS) != 1 || len(pSS[smgSession11.CGRID]) != 1 { t.Errorf("PassiveSessions: %+v", pSS) } if pSS := smg.getPassiveSessions("aabbcc", ""); len(pSS) != 0 { diff --git a/sessionmanager/smgreplc_it_test.go b/sessionmanager/smgreplc_it_test.go index 11498e6e4..dca25afdc 100644 --- a/sessionmanager/smgreplc_it_test.go +++ b/sessionmanager/smgreplc_it_test.go @@ -132,14 +132,15 @@ func TestSMGRplcInitiate(t *testing.T) { t.Error("Bad max usage: ", maxUsage) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated + cgrID := smgEv.GetCGRID(utils.META_DEFAULT) if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err != nil { t.Error(err) } else if len(pSessions) != 1 { t.Errorf("PassiveSessions: %+v", pSessions) - } else if _, hasOriginID := pSessions[smgEv.GetUUID()]; !hasOriginID { + } else if _, hasOriginID := pSessions[cgrID]; !hasOriginID { t.Errorf("PassiveSessions: %+v", pSessions) - } else if pSessions[smgEv.GetUUID()][0].TotalUsage != time.Duration(90*time.Second) { - t.Errorf("PassiveSession: %+v", pSessions[smgEv.GetUUID()][0]) + } else if pSessions[cgrID][0].TotalUsage != time.Duration(90*time.Second) { + t.Errorf("PassiveSession: %+v", pSessions[cgrID][0]) } } @@ -160,15 +161,16 @@ func TestSMGRplcUpdate(t *testing.T) { t.Error("Bad max usage: ", maxUsage) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated + cgrID := smgEv.GetCGRID(utils.META_DEFAULT) var pSessions map[string][]*SMGSession if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err != nil { t.Error(err) } else if len(pSessions) != 1 { t.Errorf("PassiveSessions: %+v", pSessions) - } else if _, hasOriginID := pSessions[smgEv.GetUUID()]; !hasOriginID { + } else if _, hasOriginID := pSessions[cgrID]; !hasOriginID { t.Errorf("PassiveSessions: %+v", pSessions) - } else if pSessions[smgEv.GetUUID()][0].TotalUsage != time.Duration(150*time.Second) { - t.Errorf("PassiveSession: %+v", pSessions[smgEv.GetUUID()][0]) + } else if pSessions[cgrID][0].TotalUsage != time.Duration(150*time.Second) { + t.Errorf("PassiveSession: %+v", pSessions[cgrID][0]) } }