Updated services reload

This commit is contained in:
Trial97
2020-01-06 17:12:19 +02:00
parent 21285b4bbc
commit 712a1fd2da
7 changed files with 56 additions and 15 deletions

View File

@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/cgrates/cgrates/config"
@@ -51,12 +52,13 @@ func init() {
}
var (
dm *DataManager
cdrStorage CdrStorage
debitPeriod = 10 * time.Second
globalRoundingDecimals = 6
connMgr *ConnManager
rpSubjectPrefixMatching bool
dm *DataManager
cdrStorage CdrStorage
debitPeriod = 10 * time.Second
globalRoundingDecimals = 6
connMgr *ConnManager
rpSubjectPrefixMatching bool
rpSubjectPrefixMatchingMutex sync.RWMutex // used to reload rpSubjectPrefixMatching
)
// SetDataStorage is the exported method to set the storage getter.
@@ -74,8 +76,19 @@ func SetRoundingDecimals(rd int) {
globalRoundingDecimals = rd
}
// SetRpSubjectPrefixMatching sets rpSubjectPrefixMatching (is thread safe)
func SetRpSubjectPrefixMatching(flag bool) {
rpSubjectPrefixMatchingMutex.Lock()
rpSubjectPrefixMatching = flag
rpSubjectPrefixMatchingMutex.Unlock()
}
// getRpSubjectPrefixMatching returns rpSubjectPrefixMatching (is thread safe)
func getRpSubjectPrefixMatching() (flag bool) {
rpSubjectPrefixMatchingMutex.RLock()
flag = rpSubjectPrefixMatching
rpSubjectPrefixMatchingMutex.RUnlock()
return
}
// SetCdrStorage sets the database for CDR storing, used by *cdrlog in first place

View File

@@ -253,7 +253,7 @@ type TenantRatingSubject struct {
}
func RatingProfileSubjectPrefixMatching(key string) (rp *RatingProfile, err error) {
if !rpSubjectPrefixMatching || strings.HasSuffix(key, utils.ANY) {
if !getRpSubjectPrefixMatching() || strings.HasSuffix(key, utils.ANY) {
return dm.GetRatingProfile(key, false, utils.NonTransactional)
}
if rp, err = dm.GetRatingProfile(key, false, utils.NonTransactional); err == nil && rp != nil { // rp nil represents cached no-result

View File

@@ -21,6 +21,7 @@ package engine
import (
"reflect"
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/config"
@@ -29,18 +30,31 @@ import (
)
type Responder struct {
ExitChan chan bool
Timeout time.Duration
Timezone string
MaxComputedUsage map[string]time.Duration
ExitChan chan bool
Timeout time.Duration
Timezone string
MaxComputedUsage map[string]time.Duration
maxComputedUsageMutex sync.RWMutex // used for MaxComputedUsage reload
}
// SetMaxComputedUsage sets MaxComputedUsage, used for config reload (is thread safe)
func (rs *Responder) SetMaxComputedUsage(mx map[string]time.Duration) {
rs.maxComputedUsageMutex.Lock()
rs.MaxComputedUsage = make(map[string]time.Duration)
for k, v := range mx {
rs.MaxComputedUsage[k] = v
}
rs.maxComputedUsageMutex.Unlock()
}
// usageAllowed checks requested usage against configured MaxComputedUsage
func (rs *Responder) usageAllowed(tor string, reqUsage time.Duration) (allowed bool) {
rs.maxComputedUsageMutex.RLock()
mcu, has := rs.MaxComputedUsage[tor]
if !has {
mcu = rs.MaxComputedUsage[utils.ANY]
}
rs.maxComputedUsageMutex.RUnlock()
if reqUsage <= mcu {
allowed = true
}

View File

@@ -97,6 +97,7 @@ type InternalDB struct {
mu sync.RWMutex
stringIndexedFields []string
prefixIndexedFields []string
indexedFieldsMutex sync.RWMutex // used for reload
cnter *utils.Counter // used for OrderID for cdr
}
@@ -111,14 +112,18 @@ func NewInternalDB(stringIndexedFields, prefixIndexedFields []string) (iDB *Inte
return
}
// SetStringIndexedFields set the stringIndexedFields, used at StorDB reload
// SetStringIndexedFields set the stringIndexedFields, used at StorDB reload (is thread safe)
func (iDB *InternalDB) SetStringIndexedFields(stringIndexedFields []string) {
iDB.indexedFieldsMutex.Lock()
iDB.stringIndexedFields = stringIndexedFields
iDB.indexedFieldsMutex.Unlock()
}
// SetPrefixIndexedFields set the prefixIndexedFields, used at StorDB reload
// SetPrefixIndexedFields set the prefixIndexedFields, used at StorDB reload (is thread safe)
func (iDB *InternalDB) SetPrefixIndexedFields(prefixIndexedFields []string) {
iDB.indexedFieldsMutex.Lock()
iDB.prefixIndexedFields = prefixIndexedFields
iDB.indexedFieldsMutex.Unlock()
}
func (iDB *InternalDB) Close() {}

View File

@@ -849,6 +849,7 @@ func (iDB *InternalDB) SetCDR(cdr *CDR, allowUpdate bool) (err error) {
}
}
idxs := utils.NewStringSet(nil)
iDB.indexedFieldsMutex.RLock()
if len(iDB.stringIndexedFields) == 0 && len(iDB.prefixIndexedFields) == 0 { // add default indexes
idxs.Add(utils.ConcatenatedKey(utils.CGRID, cdr.CGRID))
idxs.Add(utils.ConcatenatedKey(utils.RunID, cdr.RunID))
@@ -878,6 +879,7 @@ func (iDB *InternalDB) SetCDR(cdr *CDR, allowUpdate bool) (err error) {
}
}
}
iDB.indexedFieldsMutex.RUnlock()
iDB.db.Set(utils.CDRsTBL, cdrKey, cdr, idxs.AsSlice(),
cacheCommit(utils.NonTransactional), utils.NonTransactional)

View File

@@ -26,6 +26,7 @@ import (
"io/ioutil"
"reflect"
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/config"
@@ -43,6 +44,7 @@ import (
"go.mongodb.org/mongo-driver/x/bsonx"
)
// Mongo collections names
const (
ColDst = "destinations"
ColRds = "reverse_destinations"
@@ -200,6 +202,7 @@ type MongoStorage struct {
client *mongo.Client
ctx context.Context
ctxTTL time.Duration
ctxTTLMutex sync.RWMutex // used for TTL reload
db string
storageType string // datadb, stordb
ms Marshaler
@@ -209,7 +212,9 @@ type MongoStorage struct {
}
func (ms *MongoStorage) query(argfunc func(ctx mongo.SessionContext) error) (err error) {
ms.ctxTTLMutex.RLock()
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
ms.ctxTTLMutex.RUnlock()
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, argfunc)
}
@@ -219,9 +224,11 @@ func (ms *MongoStorage) IsDataDB() bool {
return ms.isDataDB
}
// SetTTL set the context TTL used for queries
// SetTTL set the context TTL used for queries (is thread safe)
func (ms *MongoStorage) SetTTL(ttl time.Duration) {
ms.ctxTTLMutex.Lock()
ms.ctxTTL = ttl
ms.ctxTTLMutex.Unlock()
}
func (ms *MongoStorage) enusureIndex(colName string, uniq bool, keys ...string) error {

View File

@@ -83,7 +83,7 @@ func (resp *ResponderService) GetIntenternalChan() (conn chan rpcclient.ClientCo
// Reload handles the change of config
func (resp *ResponderService) Reload() (err error) {
resp.Lock()
resp.resp.MaxComputedUsage = resp.cfg.RalsCfg().MaxComputedUsage // this may cause concurrency problems
resp.resp.SetMaxComputedUsage(resp.cfg.RalsCfg().MaxComputedUsage)
resp.Unlock()
return
}