From 5f82c34cad13ec0359cd1046fd227d1178a3461d Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 11 May 2018 17:45:53 +0200 Subject: [PATCH] Tester loading events from file, tighter guardian exposure, default locking timeout to 0 --- cdrc/partial_cdr.go | 2 +- cdrc/unpairedrecords.go | 2 +- cmd/cgr-tester/cgr-tester.go | 14 +++++ cmd/cgr-tester/filereader.go | 104 +++++++++++++++++++++++++++++++++++ config/config_defaults.go | 2 +- config/config_json_test.go | 2 +- config/config_test.go | 2 +- engine/cdrs.go | 2 +- guardian/guardian.go | 55 ++++++++++-------- 9 files changed, 156 insertions(+), 29 deletions(-) create mode 100644 cmd/cgr-tester/filereader.go diff --git a/cdrc/partial_cdr.go b/cdrc/partial_cdr.go index ac2ac003a..9d8e74194 100644 --- a/cdrc/partial_cdr.go +++ b/cdrc/partial_cdr.go @@ -54,7 +54,7 @@ type PartialRecordsCache struct { cdrs rpcclient.RpcClientConnection partialRecords map[string]*PartialCDRRecord // [OriginID]*PartialRecord dumpTimers map[string]*time.Timer // [OriginID]*time.Timer which can be canceled or reset - guard *guardian.GuardianLock + guard *guardian.GuardianLocker } // Dumps the cache into a .unpaired file in the outdir and cleans cache after diff --git a/cdrc/unpairedrecords.go b/cdrc/unpairedrecords.go index c32a6eb77..4ed8a62bb 100644 --- a/cdrc/unpairedrecords.go +++ b/cdrc/unpairedrecords.go @@ -41,7 +41,7 @@ type UnpairedRecordsCache struct { cdrOutDir string csvSep rune partialRecords map[string]map[string]*UnpairedRecord // [FileName"][OriginID]*PartialRecord - guard *guardian.GuardianLock + guard *guardian.GuardianLocker } // Dumps the cache into a .unpaired file in the outdir and cleans cache after diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go index 045e6bae0..6f9b26ffa 100644 --- a/cmd/cgr-tester/cgr-tester.go +++ b/cmd/cgr-tester/cgr-tester.go @@ -58,6 +58,8 @@ var ( version = flag.Bool("version", false, "Prints the application version.") nilDuration = time.Duration(0) usage = flag.String("usage", "1m", "The duration to use in call simulation.") + fPath = flag.String("file_path", "", "read requests from file with path") + reqSep = flag.String("req_separator", "\n\n", "separator for requests in file") ) func durInternalRater(cd *engine.CallDescriptor) (time.Duration, error) { @@ -154,6 +156,18 @@ func main() { pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() } + if *fPath != "" { + frt, err := NewFileReaderTester(*fPath, *raterAddress, + *parallel, *runs, []byte(*reqSep)) + if err != nil { + log.Fatal(err) + } + if err := frt.Test(); err != nil { + log.Fatal(err) + } + return + } + var timeparsed time.Duration var err error tstart := time.Now() diff --git a/cmd/cgr-tester/filereader.go b/cmd/cgr-tester/filereader.go new file mode 100644 index 000000000..bf956f7ea --- /dev/null +++ b/cmd/cgr-tester/filereader.go @@ -0,0 +1,104 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package main + +import ( + "bufio" + "bytes" + "io" + "io/ioutil" + "log" + "math/rand" + "net" + "os" + "sync" + "time" +) + +func NewFileReaderTester(fPath, cgrAddr string, parallel, runs int, reqSep []byte) (frt *FileReaderTester, err error) { + frt = &FileReaderTester{ + parallel: parallel, runs: runs, + reqSep: reqSep, + } + if frt.rdr, err = os.Open(fPath); err != nil { + return nil, err + } + if frt.conn, err = net.Dial("tcp", cgrAddr); err != nil { + return nil, err + } + return +} + +// TesterReader will read requests from file and post them remotely +type FileReaderTester struct { + parallel int + runs int + reqSep []byte + + rdr io.Reader + conn net.Conn + connScnr *bufio.Scanner +} + +func (frt *FileReaderTester) connSendReq(req []byte) (err error) { + frt.conn.SetReadDeadline(time.Now().Add(time.Millisecond)) // will block most of the times on read + if _, err = frt.conn.Write(req); err != nil { + return + } + ioutil.ReadAll(frt.conn) + return +} + +// Test reads from rdr, split the content based on lineSep and sends individual lines to remote +func (frt *FileReaderTester) Test() (err error) { + var fContent []byte + if fContent, err = ioutil.ReadAll(frt.rdr); err != nil { + return + } + reqs := bytes.Split(fContent, frt.reqSep) + + // parallel requests + if frt.parallel > 0 { + var wg sync.WaitGroup + reqLimiter := make(chan struct{}, frt.parallel) + for i := 0; i < frt.runs; i++ { + wg.Add(1) + go func(i int) { + reqLimiter <- struct{}{} // block till buffer will allow + if err := frt.connSendReq(reqs[rand.Intn(len(reqs))]); err != nil { + log.Printf("ERROR: %s", err.Error()) + } + <-reqLimiter // release one request from buffer + wg.Done() + }(i) + } + wg.Wait() + return + } + + // serial requests + for i := 0; i < frt.runs; i++ { + for _, req := range reqs { + if err := frt.connSendReq(req); err != nil { + log.Printf("ERROR: %s", err.Error()) + } + } + } + return +} diff --git a/config/config_defaults.go b/config/config_defaults.go index 9ba4bccf9..374e02336 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -47,7 +47,7 @@ const CGRATES_CFG_JSON = ` "reply_timeout": "2s", // consider connection down for replies taking longer than this value "response_cache_ttl": "0s", // the life span of a cached response "internal_ttl": "2m", // maximum duration to wait for internal connections before giving up - "locking_timeout": "5s", // timeout internal locks to avoid deadlocks + "locking_timeout": "0", // timeout internal locks to avoid deadlocks }, diff --git a/config/config_json_test.go b/config/config_json_test.go index 51c5a6ae4..cfa482c08 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -57,7 +57,7 @@ func TestDfGeneralJsonCfg(t *testing.T) { Reply_timeout: utils.StringPointer("2s"), Response_cache_ttl: utils.StringPointer("0s"), Internal_ttl: utils.StringPointer("2m"), - Locking_timeout: utils.StringPointer("5s"), + Locking_timeout: utils.StringPointer("0"), } if gCfg, err := dfCgrJsonCfg.GeneralJsonCfg(); err != nil { t.Error(err) diff --git a/config/config_test.go b/config/config_test.go index 9fe38f47b..faea43bea 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -208,7 +208,7 @@ func TestCgrCfgJSONDefaultsGeneral(t *testing.T) { if cgrCfg.InternalTtl != 2*time.Minute { t.Error(cgrCfg.InternalTtl) } - if cgrCfg.LockingTimeout != 5*time.Second { + if cgrCfg.LockingTimeout != 0 { t.Error(cgrCfg.LockingTimeout) } if cgrCfg.Logger != utils.MetaSysLog { diff --git a/engine/cdrs.go b/engine/cdrs.go index fb0624d6c..2811ca95e 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -112,7 +112,7 @@ type CdrServer struct { cdrstats rpcclient.RpcClientConnection thdS rpcclient.RpcClientConnection stats rpcclient.RpcClientConnection - guard *guardian.GuardianLock + guard *guardian.GuardianLocker responseCache *utils.ResponseCache httpPoster *utils.HTTPPoster // used for replication } diff --git a/guardian/guardian.go b/guardian/guardian.go index 52045da30..f3ee3033d 100644 --- a/guardian/guardian.go +++ b/guardian/guardian.go @@ -19,44 +19,52 @@ along with this program. If not, see package guardian import ( + "fmt" "sync" - "sync/atomic" "time" ) // global package variable -var Guardian = &GuardianLock{locksMap: make(map[string]*itemLock)} +var Guardian = &GuardianLocker{locksMap: make(map[string]*itemLock)} -func newItemLock(keyID string) *itemLock { - return &itemLock{keyID: keyID} +func newItemLock(keyID string) (il *itemLock) { + il = &itemLock{keyID: keyID} + il.lock() // need to return it already locked so we don't have concurrency on creation/unlock + return } // itemLock represents one lock with key autodestroy type itemLock struct { - keyID string // store it so we know what to destroy - cnt int64 - sync.Mutex + keyID string // store it so we know what to destroy + cnt int64 + cntLck sync.Mutex // protect the counter + lk sync.Mutex // real lock } // lock() executes combined lock with increasing counter func (il *itemLock) lock() { - atomic.AddInt64(&il.cnt, 1) - il.Lock() + il.cntLck.Lock() + il.cnt += 1 + il.cntLck.Unlock() + il.lk.Lock() } // unlock() executes combined lock with autoremoving lock from Guardian func (il *itemLock) unlock() { - atomic.AddInt64(&il.cnt, -1) - cnt := atomic.LoadInt64(&il.cnt) - if cnt < 0 { // already unlocked + il.cntLck.Lock() + if il.cnt < 1 { // already unlocked + fmt.Sprintf(" itemLock with id: %s with counter smaller than 0", il.keyID) + il.cntLck.Unlock() return } - if cnt == 0 { // last lock in the queue + il.cnt -= 1 + if il.cnt == 0 { // last lock in the queue Guardian.Lock() delete(Guardian.locksMap, il.keyID) Guardian.Unlock() } - il.Unlock() + il.lk.Unlock() + il.cntLck.Unlock() } type itemLocks []*itemLock @@ -73,33 +81,34 @@ func (ils itemLocks) unlock() { } } -// GuardianLock is an optimized locking system per locking key -type GuardianLock struct { +// GuardianLocker is an optimized locking system per locking key +type GuardianLocker struct { locksMap map[string]*itemLock sync.RWMutex // protects the maps } // lockItems locks a set of lockIDs // returning the lock structs so they can be later unlocked -func (guard *GuardianLock) lockItems(lockIDs []string) (itmLocks itemLocks) { +func (guard *GuardianLocker) lockItems(lockIDs []string) (itmLocks itemLocks) { guard.Lock() + var toLockItms itemLocks for _, lockID := range lockIDs { - var itmLock *itemLock itmLock, exists := guard.locksMap[lockID] if !exists { itmLock = newItemLock(lockID) guard.locksMap[lockID] = itmLock + } else { + toLockItms = append(toLockItms, itmLock) } itmLocks = append(itmLocks, itmLock) } guard.Unlock() - - itmLocks.lock() + toLockItms.lock() return } // Guard executes the handler between locks -func (guard *GuardianLock) Guard(handler func() (interface{}, error), timeout time.Duration, lockIDs ...string) (reply interface{}, err error) { +func (guard *GuardianLocker) Guard(handler func() (interface{}, error), timeout time.Duration, lockIDs ...string) (reply interface{}, err error) { itmLocks := guard.lockItems(lockIDs) rplyChan := make(chan interface{}) @@ -131,7 +140,7 @@ func (guard *GuardianLock) Guard(handler func() (interface{}, error), timeout ti } // GuardTimed aquires a lock for duration -func (guard *GuardianLock) GuardIDs(timeout time.Duration, lockIDs ...string) { +func (guard *GuardianLocker) GuardIDs(timeout time.Duration, lockIDs ...string) { guard.lockItems(lockIDs) if timeout != 0 { go func(timeout time.Duration, lockIDs ...string) { @@ -143,7 +152,7 @@ func (guard *GuardianLock) GuardIDs(timeout time.Duration, lockIDs ...string) { } // UnguardTimed attempts to unlock a set of locks based on their locksUUID -func (guard *GuardianLock) UnguardIDs(lockIDs ...string) { +func (guard *GuardianLocker) UnguardIDs(lockIDs ...string) { var itmLocks itemLocks guard.RLock() for _, lockID := range lockIDs {