Add sample for concurrent request limitation

This commit is contained in:
TeoV
2020-07-06 18:52:11 +03:00
parent 9fc9d4b037
commit 6f0a9afdc7
4 changed files with 114 additions and 0 deletions

View File

@@ -38,6 +38,11 @@ type AccountActionTiming struct {
}
func (apierSv1 *APIerSv1) GetAccountActionPlan(attrs *utils.TenantAccount, reply *[]*AccountActionTiming) error {
if err := config.ConReqs().VerifyAndGet(); err != nil {
return err
}
defer config.ConReqs().Putback()
if missing := utils.MissingStructFields(attrs, []string{"Tenant", "Account"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(strings.Join(missing, ","), "")
}
@@ -86,6 +91,10 @@ type AttrRemoveActionTiming struct {
// Removes an ActionTimings or parts of it depending on filters being set
func (apierSv1 *APIerSv1) RemoveActionTiming(attrs *AttrRemoveActionTiming, reply *string) (err error) {
if err := config.ConReqs().VerifyAndGet(); err != nil {
return err
}
defer config.ConReqs().Putback()
if missing := utils.MissingStructFields(attrs, []string{"ActionPlanId"}); len(missing) != 0 { // Only mandatory ActionPlanId
return utils.NewErrMandatoryIeMissing(missing...)
}
@@ -173,6 +182,10 @@ func (apierSv1 *APIerSv1) RemoveActionTiming(attrs *AttrRemoveActionTiming, repl
// Ads a new account into dataDb. If already defined, returns success.
func (apierSv1 *APIerSv1) SetAccount(attr *utils.AttrSetAccount, reply *string) (err error) {
if err := config.ConReqs().VerifyAndGet(); err != nil {
return err
}
defer config.ConReqs().Putback()
if missing := utils.MissingStructFields(attr, []string{"Tenant", "Account"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
@@ -294,6 +307,10 @@ func (apierSv1 *APIerSv1) SetAccount(attr *utils.AttrSetAccount, reply *string)
}
func (apierSv1 *APIerSv1) RemoveAccount(attr *utils.AttrRemoveAccount, reply *string) (err error) {
if err := config.ConReqs().VerifyAndGet(); err != nil {
return err
}
defer config.ConReqs().Putback()
if missing := utils.MissingStructFields(attr, []string{"Tenant", "Account"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
@@ -352,6 +369,10 @@ func (apierSv1 *APIerSv1) RemoveAccount(attr *utils.AttrRemoveAccount, reply *st
}
func (apierSv1 *APIerSv1) GetAccounts(attr *utils.AttrGetAccounts, reply *[]interface{}) error {
if err := config.ConReqs().VerifyAndGet(); err != nil {
return err
}
defer config.ConReqs().Putback()
if len(attr.Tenant) == 0 {
return utils.NewErrMandatoryIeMissing("Tenant")
}
@@ -421,9 +442,17 @@ type AttrAddBalance struct {
}
func (apierSv1 *APIerSv1) AddBalance(attr *AttrAddBalance, reply *string) error {
if err := config.ConReqs().VerifyAndGet(); err != nil {
return err
}
defer config.ConReqs().Putback()
return apierSv1.modifyBalance(utils.TOPUP, attr, reply)
}
func (apierSv1 *APIerSv1) DebitBalance(attr *AttrAddBalance, reply *string) error {
if err := config.ConReqs().VerifyAndGet(); err != nil {
return err
}
defer config.ConReqs().Putback()
return apierSv1.modifyBalance(utils.DEBIT, attr, reply)
}
@@ -501,6 +530,10 @@ func (apierSv1 *APIerSv1) modifyBalance(aType string, attr *AttrAddBalance, repl
// SetBalance sets the balance for the given account
// if the account is not already created it will create the account also
func (apierSv1 *APIerSv1) SetBalance(attr *utils.AttrSetBalance, reply *string) (err error) {
if err := config.ConReqs().VerifyAndGet(); err != nil {
return err
}
defer config.ConReqs().Putback()
if missing := utils.MissingStructFields(attr, []string{"Tenant", "Account", "BalanceType"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
@@ -573,6 +606,10 @@ func (apierSv1 *APIerSv1) SetBalance(attr *utils.AttrSetBalance, reply *string)
// RemoveBalances remove the matching balances for the account
func (apierSv1 *APIerSv1) RemoveBalances(attr *utils.AttrSetBalance, reply *string) (err error) {
if err := config.ConReqs().VerifyAndGet(); err != nil {
return err
}
defer config.ConReqs().Putback()
if missing := utils.MissingStructFields(attr, []string{"Tenant", "Account", "BalanceType"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
@@ -607,6 +644,10 @@ func (apierSv1 *APIerSv1) RemoveBalances(attr *utils.AttrSetBalance, reply *stri
}
func (apierSv1 *APIerSv1) GetAccountsCount(attr *utils.TenantArg, reply *int) (err error) {
if err := config.ConReqs().VerifyAndGet(); err != nil {
return err
}
defer config.ConReqs().Putback()
if len(attr.Tenant) == 0 {
return utils.NewErrMandatoryIeMissing("Tenant")
}

63
config/concureqs.go Normal file
View File

@@ -0,0 +1,63 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package config
import (
"fmt"
"github.com/cgrates/cgrates/utils"
)
type ConcReqs struct {
aReqs chan struct{}
nAReqs int
strategy string
}
func NewConReqs(reqs int, strategy string) *ConcReqs {
return &ConcReqs{
aReqs: make(chan struct{}, reqs),
nAReqs: reqs,
strategy: strategy,
}
}
func (cR *ConcReqs) VerifyAndGet() (err error) {
if cR.nAReqs == 0 {
return
}
switch cR.strategy {
case utils.MetaBusy:
if len(cR.aReqs) == 0 {
return fmt.Errorf("denying request due to maximum active requests reached")
}
fallthrough
case utils.MetaQueue:
<-cR.aReqs // get from channel
}
return
}
func (cR *ConcReqs) Putback() {
if cR.nAReqs == 0 {
return
}
cR.aReqs <- struct{}{}
return
}

View File

@@ -46,6 +46,7 @@ var (
dfltAstConnCfg *AsteriskConnCfg
dfltLoaderConfig *LoaderSCfg
dfltLoaderDataTypeConfig *LoaderDataType
conReqs *ConcReqs
)
func newDbDefaults() dbDefaults {
@@ -119,6 +120,7 @@ func (dbDflt dbDefaults) dbPass(dbType string, flagInput string) string {
func init() {
cgrCfg, _ = NewDefaultCGRConfig()
dbDefaultsCfg = newDbDefaults()
conReqs = NewConReqs(cgrCfg.generalCfg.ConcurrentRequests, cgrCfg.generalCfg.ConcurrentStrategy)
}
// CgrConfig is used to retrieve system configuration from other packages
@@ -126,6 +128,11 @@ func CgrConfig() *CGRConfig {
return cgrCfg
}
// ConReqs is used to retrieve system ConcurrentRequests counter from other packages
func ConReqs() *ConcReqs {
return conReqs
}
// SetCgrConfig is used to set system configuration from other places
func SetCgrConfig(cfg *CGRConfig) {
cgrCfg = cfg
@@ -241,6 +248,7 @@ func NewCGRConfigFromPath(path string) (cfg *CGRConfig, err error) {
return
}
err = cfg.checkConfigSanity()
conReqs = NewConReqs(cfg.generalCfg.ConcurrentRequests, cfg.generalCfg.ConcurrentStrategy)
return
}

View File

@@ -767,6 +767,8 @@ const (
RateS = "RateS"
Underline = "_"
MetaPartial = "*partial"
MetaBusy = "*busy"
MetaQueue = "*queue"
)
// Migrator Action