diff --git a/cdrc/partial_cdr.go b/cdrc/partial_cdr.go
index ac2ac003a..9d8e74194 100644
--- a/cdrc/partial_cdr.go
+++ b/cdrc/partial_cdr.go
@@ -54,7 +54,7 @@ type PartialRecordsCache struct {
cdrs rpcclient.RpcClientConnection
partialRecords map[string]*PartialCDRRecord // [OriginID]*PartialRecord
dumpTimers map[string]*time.Timer // [OriginID]*time.Timer which can be canceled or reset
- guard *guardian.GuardianLock
+ guard *guardian.GuardianLocker
}
// Dumps the cache into a .unpaired file in the outdir and cleans cache after
diff --git a/cdrc/unpairedrecords.go b/cdrc/unpairedrecords.go
index c32a6eb77..4ed8a62bb 100644
--- a/cdrc/unpairedrecords.go
+++ b/cdrc/unpairedrecords.go
@@ -41,7 +41,7 @@ type UnpairedRecordsCache struct {
cdrOutDir string
csvSep rune
partialRecords map[string]map[string]*UnpairedRecord // [FileName"][OriginID]*PartialRecord
- guard *guardian.GuardianLock
+ guard *guardian.GuardianLocker
}
// Dumps the cache into a .unpaired file in the outdir and cleans cache after
diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go
index 045e6bae0..6f9b26ffa 100644
--- a/cmd/cgr-tester/cgr-tester.go
+++ b/cmd/cgr-tester/cgr-tester.go
@@ -58,6 +58,8 @@ var (
version = flag.Bool("version", false, "Prints the application version.")
nilDuration = time.Duration(0)
usage = flag.String("usage", "1m", "The duration to use in call simulation.")
+ fPath = flag.String("file_path", "", "read requests from file with path")
+ reqSep = flag.String("req_separator", "\n\n", "separator for requests in file")
)
func durInternalRater(cd *engine.CallDescriptor) (time.Duration, error) {
@@ -154,6 +156,18 @@ func main() {
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
+ if *fPath != "" {
+ frt, err := NewFileReaderTester(*fPath, *raterAddress,
+ *parallel, *runs, []byte(*reqSep))
+ if err != nil {
+ log.Fatal(err)
+ }
+ if err := frt.Test(); err != nil {
+ log.Fatal(err)
+ }
+ return
+ }
+
var timeparsed time.Duration
var err error
tstart := time.Now()
diff --git a/cmd/cgr-tester/filereader.go b/cmd/cgr-tester/filereader.go
new file mode 100644
index 000000000..bf956f7ea
--- /dev/null
+++ b/cmd/cgr-tester/filereader.go
@@ -0,0 +1,104 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package main
+
+import (
+ "bufio"
+ "bytes"
+ "io"
+ "io/ioutil"
+ "log"
+ "math/rand"
+ "net"
+ "os"
+ "sync"
+ "time"
+)
+
+func NewFileReaderTester(fPath, cgrAddr string, parallel, runs int, reqSep []byte) (frt *FileReaderTester, err error) {
+ frt = &FileReaderTester{
+ parallel: parallel, runs: runs,
+ reqSep: reqSep,
+ }
+ if frt.rdr, err = os.Open(fPath); err != nil {
+ return nil, err
+ }
+ if frt.conn, err = net.Dial("tcp", cgrAddr); err != nil {
+ return nil, err
+ }
+ return
+}
+
+// TesterReader will read requests from file and post them remotely
+type FileReaderTester struct {
+ parallel int
+ runs int
+ reqSep []byte
+
+ rdr io.Reader
+ conn net.Conn
+ connScnr *bufio.Scanner
+}
+
+func (frt *FileReaderTester) connSendReq(req []byte) (err error) {
+ frt.conn.SetReadDeadline(time.Now().Add(time.Millisecond)) // will block most of the times on read
+ if _, err = frt.conn.Write(req); err != nil {
+ return
+ }
+ ioutil.ReadAll(frt.conn)
+ return
+}
+
+// Test reads from rdr, split the content based on lineSep and sends individual lines to remote
+func (frt *FileReaderTester) Test() (err error) {
+ var fContent []byte
+ if fContent, err = ioutil.ReadAll(frt.rdr); err != nil {
+ return
+ }
+ reqs := bytes.Split(fContent, frt.reqSep)
+
+ // parallel requests
+ if frt.parallel > 0 {
+ var wg sync.WaitGroup
+ reqLimiter := make(chan struct{}, frt.parallel)
+ for i := 0; i < frt.runs; i++ {
+ wg.Add(1)
+ go func(i int) {
+ reqLimiter <- struct{}{} // block till buffer will allow
+ if err := frt.connSendReq(reqs[rand.Intn(len(reqs))]); err != nil {
+ log.Printf("ERROR: %s", err.Error())
+ }
+ <-reqLimiter // release one request from buffer
+ wg.Done()
+ }(i)
+ }
+ wg.Wait()
+ return
+ }
+
+ // serial requests
+ for i := 0; i < frt.runs; i++ {
+ for _, req := range reqs {
+ if err := frt.connSendReq(req); err != nil {
+ log.Printf("ERROR: %s", err.Error())
+ }
+ }
+ }
+ return
+}
diff --git a/config/config_defaults.go b/config/config_defaults.go
index 9ba4bccf9..374e02336 100755
--- a/config/config_defaults.go
+++ b/config/config_defaults.go
@@ -47,7 +47,7 @@ const CGRATES_CFG_JSON = `
"reply_timeout": "2s", // consider connection down for replies taking longer than this value
"response_cache_ttl": "0s", // the life span of a cached response
"internal_ttl": "2m", // maximum duration to wait for internal connections before giving up
- "locking_timeout": "5s", // timeout internal locks to avoid deadlocks
+ "locking_timeout": "0", // timeout internal locks to avoid deadlocks
},
diff --git a/config/config_json_test.go b/config/config_json_test.go
index 51c5a6ae4..cfa482c08 100755
--- a/config/config_json_test.go
+++ b/config/config_json_test.go
@@ -57,7 +57,7 @@ func TestDfGeneralJsonCfg(t *testing.T) {
Reply_timeout: utils.StringPointer("2s"),
Response_cache_ttl: utils.StringPointer("0s"),
Internal_ttl: utils.StringPointer("2m"),
- Locking_timeout: utils.StringPointer("5s"),
+ Locking_timeout: utils.StringPointer("0"),
}
if gCfg, err := dfCgrJsonCfg.GeneralJsonCfg(); err != nil {
t.Error(err)
diff --git a/config/config_test.go b/config/config_test.go
index 9fe38f47b..faea43bea 100755
--- a/config/config_test.go
+++ b/config/config_test.go
@@ -208,7 +208,7 @@ func TestCgrCfgJSONDefaultsGeneral(t *testing.T) {
if cgrCfg.InternalTtl != 2*time.Minute {
t.Error(cgrCfg.InternalTtl)
}
- if cgrCfg.LockingTimeout != 5*time.Second {
+ if cgrCfg.LockingTimeout != 0 {
t.Error(cgrCfg.LockingTimeout)
}
if cgrCfg.Logger != utils.MetaSysLog {
diff --git a/engine/cdrs.go b/engine/cdrs.go
index fb0624d6c..2811ca95e 100644
--- a/engine/cdrs.go
+++ b/engine/cdrs.go
@@ -112,7 +112,7 @@ type CdrServer struct {
cdrstats rpcclient.RpcClientConnection
thdS rpcclient.RpcClientConnection
stats rpcclient.RpcClientConnection
- guard *guardian.GuardianLock
+ guard *guardian.GuardianLocker
responseCache *utils.ResponseCache
httpPoster *utils.HTTPPoster // used for replication
}
diff --git a/guardian/guardian.go b/guardian/guardian.go
index 52045da30..f3ee3033d 100644
--- a/guardian/guardian.go
+++ b/guardian/guardian.go
@@ -19,44 +19,52 @@ along with this program. If not, see
package guardian
import (
+ "fmt"
"sync"
- "sync/atomic"
"time"
)
// global package variable
-var Guardian = &GuardianLock{locksMap: make(map[string]*itemLock)}
+var Guardian = &GuardianLocker{locksMap: make(map[string]*itemLock)}
-func newItemLock(keyID string) *itemLock {
- return &itemLock{keyID: keyID}
+func newItemLock(keyID string) (il *itemLock) {
+ il = &itemLock{keyID: keyID}
+ il.lock() // need to return it already locked so we don't have concurrency on creation/unlock
+ return
}
// itemLock represents one lock with key autodestroy
type itemLock struct {
- keyID string // store it so we know what to destroy
- cnt int64
- sync.Mutex
+ keyID string // store it so we know what to destroy
+ cnt int64
+ cntLck sync.Mutex // protect the counter
+ lk sync.Mutex // real lock
}
// lock() executes combined lock with increasing counter
func (il *itemLock) lock() {
- atomic.AddInt64(&il.cnt, 1)
- il.Lock()
+ il.cntLck.Lock()
+ il.cnt += 1
+ il.cntLck.Unlock()
+ il.lk.Lock()
}
// unlock() executes combined lock with autoremoving lock from Guardian
func (il *itemLock) unlock() {
- atomic.AddInt64(&il.cnt, -1)
- cnt := atomic.LoadInt64(&il.cnt)
- if cnt < 0 { // already unlocked
+ il.cntLck.Lock()
+ if il.cnt < 1 { // already unlocked
+ fmt.Sprintf(" itemLock with id: %s with counter smaller than 0", il.keyID)
+ il.cntLck.Unlock()
return
}
- if cnt == 0 { // last lock in the queue
+ il.cnt -= 1
+ if il.cnt == 0 { // last lock in the queue
Guardian.Lock()
delete(Guardian.locksMap, il.keyID)
Guardian.Unlock()
}
- il.Unlock()
+ il.lk.Unlock()
+ il.cntLck.Unlock()
}
type itemLocks []*itemLock
@@ -73,33 +81,34 @@ func (ils itemLocks) unlock() {
}
}
-// GuardianLock is an optimized locking system per locking key
-type GuardianLock struct {
+// GuardianLocker is an optimized locking system per locking key
+type GuardianLocker struct {
locksMap map[string]*itemLock
sync.RWMutex // protects the maps
}
// lockItems locks a set of lockIDs
// returning the lock structs so they can be later unlocked
-func (guard *GuardianLock) lockItems(lockIDs []string) (itmLocks itemLocks) {
+func (guard *GuardianLocker) lockItems(lockIDs []string) (itmLocks itemLocks) {
guard.Lock()
+ var toLockItms itemLocks
for _, lockID := range lockIDs {
- var itmLock *itemLock
itmLock, exists := guard.locksMap[lockID]
if !exists {
itmLock = newItemLock(lockID)
guard.locksMap[lockID] = itmLock
+ } else {
+ toLockItms = append(toLockItms, itmLock)
}
itmLocks = append(itmLocks, itmLock)
}
guard.Unlock()
-
- itmLocks.lock()
+ toLockItms.lock()
return
}
// Guard executes the handler between locks
-func (guard *GuardianLock) Guard(handler func() (interface{}, error), timeout time.Duration, lockIDs ...string) (reply interface{}, err error) {
+func (guard *GuardianLocker) Guard(handler func() (interface{}, error), timeout time.Duration, lockIDs ...string) (reply interface{}, err error) {
itmLocks := guard.lockItems(lockIDs)
rplyChan := make(chan interface{})
@@ -131,7 +140,7 @@ func (guard *GuardianLock) Guard(handler func() (interface{}, error), timeout ti
}
// GuardTimed aquires a lock for duration
-func (guard *GuardianLock) GuardIDs(timeout time.Duration, lockIDs ...string) {
+func (guard *GuardianLocker) GuardIDs(timeout time.Duration, lockIDs ...string) {
guard.lockItems(lockIDs)
if timeout != 0 {
go func(timeout time.Duration, lockIDs ...string) {
@@ -143,7 +152,7 @@ func (guard *GuardianLock) GuardIDs(timeout time.Duration, lockIDs ...string) {
}
// UnguardTimed attempts to unlock a set of locks based on their locksUUID
-func (guard *GuardianLock) UnguardIDs(lockIDs ...string) {
+func (guard *GuardianLocker) UnguardIDs(lockIDs ...string) {
var itmLocks itemLocks
guard.RLock()
for _, lockID := range lockIDs {