From 712a1fd2da3d2f3dadaed1fa6c6aa1d0f9f694b8 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Mon, 6 Jan 2020 17:12:19 +0200 Subject: [PATCH] Updated services reload --- engine/calldesc.go | 25 +++++++++++++++++++------ engine/ratingprofile.go | 2 +- engine/responder.go | 22 ++++++++++++++++++---- engine/storage_internal_datadb.go | 9 +++++++-- engine/storage_internal_stordb.go | 2 ++ engine/storage_mongo_datadb.go | 9 ++++++++- services/responders.go | 2 +- 7 files changed, 56 insertions(+), 15 deletions(-) diff --git a/engine/calldesc.go b/engine/calldesc.go index 2e5f995a5..264f4f408 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "net" + "sync" "time" "github.com/cgrates/cgrates/config" @@ -51,12 +52,13 @@ func init() { } var ( - dm *DataManager - cdrStorage CdrStorage - debitPeriod = 10 * time.Second - globalRoundingDecimals = 6 - connMgr *ConnManager - rpSubjectPrefixMatching bool + dm *DataManager + cdrStorage CdrStorage + debitPeriod = 10 * time.Second + globalRoundingDecimals = 6 + connMgr *ConnManager + rpSubjectPrefixMatching bool + rpSubjectPrefixMatchingMutex sync.RWMutex // used to reload rpSubjectPrefixMatching ) // SetDataStorage is the exported method to set the storage getter. @@ -74,8 +76,19 @@ func SetRoundingDecimals(rd int) { globalRoundingDecimals = rd } +// SetRpSubjectPrefixMatching sets rpSubjectPrefixMatching (is thread safe) func SetRpSubjectPrefixMatching(flag bool) { + rpSubjectPrefixMatchingMutex.Lock() rpSubjectPrefixMatching = flag + rpSubjectPrefixMatchingMutex.Unlock() +} + +// getRpSubjectPrefixMatching returns rpSubjectPrefixMatching (is thread safe) +func getRpSubjectPrefixMatching() (flag bool) { + rpSubjectPrefixMatchingMutex.RLock() + flag = rpSubjectPrefixMatching + rpSubjectPrefixMatchingMutex.RUnlock() + return } // SetCdrStorage sets the database for CDR storing, used by *cdrlog in first place diff --git a/engine/ratingprofile.go b/engine/ratingprofile.go index 67b21d4f9..d3b551e36 100644 --- a/engine/ratingprofile.go +++ b/engine/ratingprofile.go @@ -253,7 +253,7 @@ type TenantRatingSubject struct { } func RatingProfileSubjectPrefixMatching(key string) (rp *RatingProfile, err error) { - if !rpSubjectPrefixMatching || strings.HasSuffix(key, utils.ANY) { + if !getRpSubjectPrefixMatching() || strings.HasSuffix(key, utils.ANY) { return dm.GetRatingProfile(key, false, utils.NonTransactional) } if rp, err = dm.GetRatingProfile(key, false, utils.NonTransactional); err == nil && rp != nil { // rp nil represents cached no-result diff --git a/engine/responder.go b/engine/responder.go index 04fc26301..16de647a7 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -21,6 +21,7 @@ package engine import ( "reflect" "strings" + "sync" "time" "github.com/cgrates/cgrates/config" @@ -29,18 +30,31 @@ import ( ) type Responder struct { - ExitChan chan bool - Timeout time.Duration - Timezone string - MaxComputedUsage map[string]time.Duration + ExitChan chan bool + Timeout time.Duration + Timezone string + MaxComputedUsage map[string]time.Duration + maxComputedUsageMutex sync.RWMutex // used for MaxComputedUsage reload +} + +// SetMaxComputedUsage sets MaxComputedUsage, used for config reload (is thread safe) +func (rs *Responder) SetMaxComputedUsage(mx map[string]time.Duration) { + rs.maxComputedUsageMutex.Lock() + rs.MaxComputedUsage = make(map[string]time.Duration) + for k, v := range mx { + rs.MaxComputedUsage[k] = v + } + rs.maxComputedUsageMutex.Unlock() } // usageAllowed checks requested usage against configured MaxComputedUsage func (rs *Responder) usageAllowed(tor string, reqUsage time.Duration) (allowed bool) { + rs.maxComputedUsageMutex.RLock() mcu, has := rs.MaxComputedUsage[tor] if !has { mcu = rs.MaxComputedUsage[utils.ANY] } + rs.maxComputedUsageMutex.RUnlock() if reqUsage <= mcu { allowed = true } diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index 5f97294dd..cfab65bcc 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -97,6 +97,7 @@ type InternalDB struct { mu sync.RWMutex stringIndexedFields []string prefixIndexedFields []string + indexedFieldsMutex sync.RWMutex // used for reload cnter *utils.Counter // used for OrderID for cdr } @@ -111,14 +112,18 @@ func NewInternalDB(stringIndexedFields, prefixIndexedFields []string) (iDB *Inte return } -// SetStringIndexedFields set the stringIndexedFields, used at StorDB reload +// SetStringIndexedFields set the stringIndexedFields, used at StorDB reload (is thread safe) func (iDB *InternalDB) SetStringIndexedFields(stringIndexedFields []string) { + iDB.indexedFieldsMutex.Lock() iDB.stringIndexedFields = stringIndexedFields + iDB.indexedFieldsMutex.Unlock() } -// SetPrefixIndexedFields set the prefixIndexedFields, used at StorDB reload +// SetPrefixIndexedFields set the prefixIndexedFields, used at StorDB reload (is thread safe) func (iDB *InternalDB) SetPrefixIndexedFields(prefixIndexedFields []string) { + iDB.indexedFieldsMutex.Lock() iDB.prefixIndexedFields = prefixIndexedFields + iDB.indexedFieldsMutex.Unlock() } func (iDB *InternalDB) Close() {} diff --git a/engine/storage_internal_stordb.go b/engine/storage_internal_stordb.go index 6fd590450..99cc465cf 100644 --- a/engine/storage_internal_stordb.go +++ b/engine/storage_internal_stordb.go @@ -849,6 +849,7 @@ func (iDB *InternalDB) SetCDR(cdr *CDR, allowUpdate bool) (err error) { } } idxs := utils.NewStringSet(nil) + iDB.indexedFieldsMutex.RLock() if len(iDB.stringIndexedFields) == 0 && len(iDB.prefixIndexedFields) == 0 { // add default indexes idxs.Add(utils.ConcatenatedKey(utils.CGRID, cdr.CGRID)) idxs.Add(utils.ConcatenatedKey(utils.RunID, cdr.RunID)) @@ -878,6 +879,7 @@ func (iDB *InternalDB) SetCDR(cdr *CDR, allowUpdate bool) (err error) { } } } + iDB.indexedFieldsMutex.RUnlock() iDB.db.Set(utils.CDRsTBL, cdrKey, cdr, idxs.AsSlice(), cacheCommit(utils.NonTransactional), utils.NonTransactional) diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index a4aac5797..1037af511 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -26,6 +26,7 @@ import ( "io/ioutil" "reflect" "strings" + "sync" "time" "github.com/cgrates/cgrates/config" @@ -43,6 +44,7 @@ import ( "go.mongodb.org/mongo-driver/x/bsonx" ) +// Mongo collections names const ( ColDst = "destinations" ColRds = "reverse_destinations" @@ -200,6 +202,7 @@ type MongoStorage struct { client *mongo.Client ctx context.Context ctxTTL time.Duration + ctxTTLMutex sync.RWMutex // used for TTL reload db string storageType string // datadb, stordb ms Marshaler @@ -209,7 +212,9 @@ type MongoStorage struct { } func (ms *MongoStorage) query(argfunc func(ctx mongo.SessionContext) error) (err error) { + ms.ctxTTLMutex.RLock() ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL) + ms.ctxTTLMutex.RUnlock() defer ctxSessionCancel() return ms.client.UseSession(ctxSession, argfunc) } @@ -219,9 +224,11 @@ func (ms *MongoStorage) IsDataDB() bool { return ms.isDataDB } -// SetTTL set the context TTL used for queries +// SetTTL set the context TTL used for queries (is thread safe) func (ms *MongoStorage) SetTTL(ttl time.Duration) { + ms.ctxTTLMutex.Lock() ms.ctxTTL = ttl + ms.ctxTTLMutex.Unlock() } func (ms *MongoStorage) enusureIndex(colName string, uniq bool, keys ...string) error { diff --git a/services/responders.go b/services/responders.go index b6f6bc226..1a619cd24 100644 --- a/services/responders.go +++ b/services/responders.go @@ -83,7 +83,7 @@ func (resp *ResponderService) GetIntenternalChan() (conn chan rpcclient.ClientCo // Reload handles the change of config func (resp *ResponderService) Reload() (err error) { resp.Lock() - resp.resp.MaxComputedUsage = resp.cfg.RalsCfg().MaxComputedUsage // this may cause concurrency problems + resp.resp.SetMaxComputedUsage(resp.cfg.RalsCfg().MaxComputedUsage) resp.Unlock() return }