From ea9177d77ba2adca4287a0791c06229beb4180f5 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 3 Apr 2017 11:20:50 +0200 Subject: [PATCH 1/4] Upgrading rpcclient for RPCCloner interface --- glide.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/glide.lock b/glide.lock index d422d1bc1..f748059df 100644 --- a/glide.lock +++ b/glide.lock @@ -16,7 +16,7 @@ imports: - name: github.com/cgrates/osipsdagram version: 3d6beed663452471dec3ca194137a30d379d9e8f - name: github.com/cgrates/rpcclient - version: 80f4346e7b309760e445f0424f0e0ff38538f196 + version: dddae42e9344e877627cd4b7aba075d63b452c0b - name: github.com/ChrisTrenkamp/goxpath version: 4aad8d0161aae7d17df4755d2c1e86cd1fcaaab6 subpackages: From e452d455f4ba433d43844e01985288d9e6597779 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 3 Apr 2017 13:18:13 +0200 Subject: [PATCH 2/4] SMG replication with session cloning to avoid concurrency on slow replication --- sessionmanager/smg_event_test.go | 3 --- sessionmanager/smg_session.go | 3 ++- sessionmanager/smgeneric.go | 13 ++++++++++++- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/sessionmanager/smg_event_test.go b/sessionmanager/smg_event_test.go index 2b2143d0b..249cd0235 100644 --- a/sessionmanager/smg_event_test.go +++ b/sessionmanager/smg_event_test.go @@ -18,7 +18,6 @@ along with this program. If not, see package sessionmanager import ( - "fmt" "reflect" "testing" "time" @@ -148,8 +147,6 @@ func TestSMGenericEventGetSessionTTL(t *testing.T) { sesTTLMaxDelay := time.Duration(10 * time.Second) if sTTL := smGev.GetSessionTTL(time.Duration(5*time.Second), &sesTTLMaxDelay); sTTL == eSesTTL || sTTL > eSesTTL+sesTTLMaxDelay { t.Errorf("Received: %v", sTTL) - } else { - fmt.Println(sTTL) } } diff --git a/sessionmanager/smg_session.go b/sessionmanager/smg_session.go index 257ff9925..8645ee926 100644 --- a/sessionmanager/smg_session.go +++ b/sessionmanager/smg_session.go @@ -22,6 +22,7 @@ import ( "fmt" "reflect" "strconv" + "sync" "time" "github.com/cgrates/cgrates/engine" @@ -31,6 +32,7 @@ import ( // One session handled by SM type SMGSession struct { + mux sync.RWMutex // protects the SMGSession in places where is concurrently accessed CGRID string // Unique identifier for this session EventStart SMGenericEvent // Event which started stopDebit chan struct{} // Channel to communicate with debit loops when closing the session @@ -142,7 +144,6 @@ func (self *SMGSession) debit(dur time.Duration, lastUsed *time.Duration) (time. // Attempts to refund a duration, error on failure func (self *SMGSession) refund(refundDuration time.Duration) error { - if refundDuration == 0 { // Nothing to refund return nil } diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 733a93b69..76b46db77 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -471,7 +471,18 @@ func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool } ssMux.RLock() ss := ssMp[cgrID] + if len(ss) != 0 { + ss[0].mux.RLock() // lock session so we can clone it after releasing the map lock + } ssMux.RUnlock() + var ssCln []*SMGSession + err = utils.Clone(ss, &ssCln) + if len(ss) != 0 { + ss[0].mux.RUnlock() + } + if err != nil { + return + } var wg sync.WaitGroup for _, rplConn := range smgReplConns { if rplConn.Synchronous { @@ -484,7 +495,7 @@ func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool if sync { wg.Done() } - }(rplConn.Connection, rplConn.Synchronous, ss) + }(rplConn.Connection, rplConn.Synchronous, ssCln) } wg.Wait() // wait for synchronous replication to finish return From 087ef7c7e791f09c0296deb5d778c39f73d77407 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 3 Apr 2017 16:57:06 +0200 Subject: [PATCH 3/4] Diameter fixes to cope with application type=auth, diameter tests back on track --- agents/dmtagent_it_test.go | 2 +- agents/dmtclient.go | 5 +++-- data/diameter/dict/huawei/3gpp_vendor.xml | 11 +---------- data/diameter/dict/huawei/base.xml | 2 +- data/diameter/dict/huawei/huawei.xml | 2 +- data/diameter/dict/huawei/nasreq.xml | 2 +- data/diameter/dict/huawei/nokia.xml | 2 +- data/diameter/dict/huawei/vodafone.xml | 2 +- 8 files changed, 10 insertions(+), 18 deletions(-) diff --git a/agents/dmtagent_it_test.go b/agents/dmtagent_it_test.go index 201ac4cb7..3d662fb8d 100644 --- a/agents/dmtagent_it_test.go +++ b/agents/dmtagent_it_test.go @@ -191,7 +191,7 @@ func TestDmtAgentTPFromFolder(t *testing.T) { time.Sleep(time.Duration(1000) * time.Millisecond) // Give time for scheduler to execute topups } -func TestConnectDiameterClient(t *testing.T) { +func TestDmtAgentConnectDiameterClient(t *testing.T) { dmtClient, err = NewDiameterClient(daCfg.DiameterAgentCfg().Listen, "UNIT_TEST", daCfg.DiameterAgentCfg().OriginRealm, daCfg.DiameterAgentCfg().VendorId, daCfg.DiameterAgentCfg().ProductName, utils.DIAMETER_FIRMWARE_REVISION, daCfg.DiameterAgentCfg().DictionariesDir) if err != nil { diff --git a/agents/dmtclient.go b/agents/dmtclient.go index 8abfb954b..528ca3487 100644 --- a/agents/dmtclient.go +++ b/agents/dmtclient.go @@ -49,8 +49,9 @@ func NewDiameterClient(addr, originHost, originRealm string, vendorId int, produ RetransmitInterval: time.Second, EnableWatchdog: true, WatchdogInterval: 5 * time.Second, - AcctApplicationID: []*diam.AVP{ - diam.NewAVP(avp.AcctApplicationID, avp.Mbit, 0, datatype.Unsigned32(4)), // RFC 4006 + AuthApplicationID: []*diam.AVP{ + // Advertise support for credit control application + diam.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(4)), // RFC 4006 }, } if len(dictsDir) != 0 { diff --git a/data/diameter/dict/huawei/3gpp_vendor.xml b/data/diameter/dict/huawei/3gpp_vendor.xml index 4d8968b60..aed4d457a 100644 --- a/data/diameter/dict/huawei/3gpp_vendor.xml +++ b/data/diameter/dict/huawei/3gpp_vendor.xml @@ -1,6 +1,6 @@ - + @@ -1373,15 +1373,6 @@ - - - - - - - - - diff --git a/data/diameter/dict/huawei/base.xml b/data/diameter/dict/huawei/base.xml index 5828f6625..935896ad8 100644 --- a/data/diameter/dict/huawei/base.xml +++ b/data/diameter/dict/huawei/base.xml @@ -1,6 +1,6 @@ - + diff --git a/data/diameter/dict/huawei/huawei.xml b/data/diameter/dict/huawei/huawei.xml index c70ee3679..6866a0415 100644 --- a/data/diameter/dict/huawei/huawei.xml +++ b/data/diameter/dict/huawei/huawei.xml @@ -1,6 +1,6 @@ - + diff --git a/data/diameter/dict/huawei/nasreq.xml b/data/diameter/dict/huawei/nasreq.xml index 1384d4a04..f4b5d601a 100644 --- a/data/diameter/dict/huawei/nasreq.xml +++ b/data/diameter/dict/huawei/nasreq.xml @@ -1,6 +1,6 @@ - + diff --git a/data/diameter/dict/huawei/nokia.xml b/data/diameter/dict/huawei/nokia.xml index 79bb31a77..7bc44ef86 100644 --- a/data/diameter/dict/huawei/nokia.xml +++ b/data/diameter/dict/huawei/nokia.xml @@ -1,6 +1,6 @@ - + diff --git a/data/diameter/dict/huawei/vodafone.xml b/data/diameter/dict/huawei/vodafone.xml index f8b92bc52..3ff8e76a1 100644 --- a/data/diameter/dict/huawei/vodafone.xml +++ b/data/diameter/dict/huawei/vodafone.xml @@ -1,6 +1,6 @@ - + From 057a7fd4207c6dec05d10dbecfe664396f1db00e Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 3 Apr 2017 19:31:14 +0200 Subject: [PATCH 4/4] SMGeneric - locks for SMGSession --- sessionmanager/smg_session.go | 8 ++++++++ sessionmanager/smgeneric.go | 2 ++ 2 files changed, 10 insertions(+) diff --git a/sessionmanager/smg_session.go b/sessionmanager/smg_session.go index 8645ee926..9b7b77b43 100644 --- a/sessionmanager/smg_session.go +++ b/sessionmanager/smg_session.go @@ -84,6 +84,8 @@ func (self *SMGSession) debitLoop(debitInterval time.Duration) { // Attempts to debit a duration, returns maximum duration which can be debitted or error func (self *SMGSession) debit(dur time.Duration, lastUsed *time.Duration) (time.Duration, error) { + self.mux.Lock() + defer self.mux.Unlock() requestedDuration := dur if lastUsed != nil { self.ExtraDuration = self.LastDebit - *lastUsed @@ -219,6 +221,8 @@ func (self *SMGSession) mergeCCs() { // Session has ended, check debits and refund the extra charged duration func (self *SMGSession) close(endTime time.Time) (err error) { + self.mux.Lock() + defer self.mux.Unlock() if len(self.CallCosts) != 0 { // We have had at least one cost calculation chargedEndTime := self.CallCosts[len(self.CallCosts)-1].GetEndTime() if endTime.After(chargedEndTime) { // we did not charge enough, make a manual debit here @@ -263,6 +267,8 @@ func (self *SMGSession) saveOperations(cgrID string) error { if len(self.CallCosts) == 0 { return nil // There are no costs to save, ignore the operation } + self.mux.Lock() + self.mux.Unlock() cc := self.CallCosts[0] // was merged in close method cc.Round() roundIncrements := cc.GetRoundIncrements() @@ -318,6 +324,8 @@ func (self *SMGSession) storeSMCost(smCost *engine.SMCost) error { } func (self *SMGSession) AsActiveSession(timezone string) *ActiveSession { + self.mux.RLock() + defer self.mux.RUnlock() sTime, _ := self.EventStart.GetSetupTime(utils.META_DEFAULT, timezone) aTime, _ := self.EventStart.GetAnswerTime(utils.META_DEFAULT, timezone) pdd, _ := self.EventStart.GetPdd(utils.META_DEFAULT) diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 76b46db77..b84780761 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -229,6 +229,8 @@ func (smg *SMGeneric) indexSession(s *SMGSession, passiveSessions bool) { } idxMux.Lock() defer idxMux.Unlock() + s.mux.RLock() + defer s.mux.RUnlock() for fieldName := range smg.ssIdxCfg { fieldVal, err := utils.ReflectFieldAsString(s.EventStart, fieldName, "") if err != nil {