mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Added timeout for Guardian.Guard
This commit is contained in:
committed by
Dan Christian Bogos
parent
2c081cc372
commit
f8de227351
@@ -25,6 +25,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -73,7 +74,7 @@ func (self *ApierV1) GetAccountActionPlan(attrs AttrAcntAction, reply *[]*Accoun
|
||||
}
|
||||
}
|
||||
return accountATs, nil
|
||||
}, 0, utils.ACTION_PLAN_PREFIX)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ACTION_PLAN_PREFIX)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -154,7 +155,7 @@ func (self *ApierV1) RemActionTiming(attrs AttrRemActionTiming, reply *string) (
|
||||
}
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, utils.ACTION_PLAN_PREFIX)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ACTION_PLAN_PREFIX)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -251,7 +252,7 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, utils.ACTION_PLAN_PREFIX)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ACTION_PLAN_PREFIX)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@@ -277,7 +278,7 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, accID)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, accID)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
@@ -324,7 +325,7 @@ func (self *ApierV1) RemoveAccount(attr utils.AttrRemoveAccount, reply *string)
|
||||
}
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, utils.ACTION_PLAN_PREFIX)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ACTION_PLAN_PREFIX)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@@ -332,7 +333,7 @@ func (self *ApierV1) RemoveAccount(attr utils.AttrRemoveAccount, reply *string)
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, accID)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, accID)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
|
||||
@@ -641,7 +641,7 @@ func (self *ApierV1) SetActionPlan(attrs AttrSetActionPlan, reply *string) (err
|
||||
}
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, utils.ACTION_PLAN_PREFIX)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ACTION_PLAN_PREFIX)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -689,11 +689,8 @@ func (self *ApierV1) LoadAccountActions(attrs utils.TPAccountActions, reply *str
|
||||
dbReader := engine.NewTpReader(self.DataManager.DataDB(), self.StorDb,
|
||||
attrs.TPid, self.Config.GeneralCfg().DefaultTimezone)
|
||||
if _, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
if err := dbReader.LoadAccountActionsFiltered(&attrs); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, attrs.LoadId); err != nil {
|
||||
return 0, dbReader.LoadAccountActionsFiltered(&attrs)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, attrs.LoadId); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
// ToDo: Get the action keys loaded by dbReader so we reload only these in cache
|
||||
@@ -1337,12 +1334,8 @@ func (self *ApierV1) RemoveRatingProfile(attr AttrRemoveRatingProfile, reply *st
|
||||
return utils.ErrMandatoryIeMissing
|
||||
}
|
||||
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
err := self.DataManager.RemoveRatingProfile(attr.GetId(), utils.NonTransactional)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, "RemoveRatingProfile")
|
||||
return 0, self.DataManager.RemoveRatingProfile(attr.GetId(), utils.NonTransactional)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, "RemoveRatingProfile")
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -1513,10 +1506,7 @@ func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) (
|
||||
if fileContent, err = ioutil.ReadFile(filePath); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if err := os.Remove(filePath); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
return 0, os.Remove(filePath)
|
||||
}, v1.Config.GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -1556,11 +1546,9 @@ func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) (
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer fileOut.Close()
|
||||
if _, err := fileOut.Write(fileContent); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
_, err = fileOut.Write(fileContent)
|
||||
fileOut.Close()
|
||||
return 0, err
|
||||
}, v1.Config.GeneralCfg().LockingTimeout, utils.FileLockPrefix+failoverPath)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
|
||||
@@ -21,6 +21,7 @@ package v1
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -96,11 +97,8 @@ func (self *ApierV1) AddAccountActionTriggers(attr AttrAddAccountActionTriggers,
|
||||
}
|
||||
}
|
||||
account.InitCounters()
|
||||
if err := self.DataManager.DataDB().SetAccount(account); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, accID)
|
||||
return 0, self.DataManager.DataDB().SetAccount(account)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, accID)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
return err
|
||||
@@ -139,11 +137,8 @@ func (self *ApierV1) RemoveAccountActionTriggers(attr AttrRemoveAccountActionTri
|
||||
}
|
||||
account.ActionTriggers = newActionTriggers
|
||||
account.InitCounters()
|
||||
if err := self.DataManager.DataDB().SetAccount(account); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, accID)
|
||||
return 0, self.DataManager.DataDB().SetAccount(account)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, accID)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
return err
|
||||
@@ -184,11 +179,8 @@ func (self *ApierV1) ResetAccountActionTriggers(attr AttrResetAccountActionTrigg
|
||||
if attr.Executed == false {
|
||||
account.ExecuteActionTriggers(nil)
|
||||
}
|
||||
if err := self.DataManager.DataDB().SetAccount(account); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, accID)
|
||||
return 0, self.DataManager.DataDB().SetAccount(account)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, accID)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
return err
|
||||
@@ -329,11 +321,8 @@ func (self *ApierV1) SetAccountActionTriggers(attr AttrSetAccountActionTriggers,
|
||||
|
||||
}
|
||||
account.ExecuteActionTriggers(nil)
|
||||
if err := self.DataManager.DataDB().SetAccount(account); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, accID)
|
||||
return 0, self.DataManager.DataDB().SetAccount(account)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, accID)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
return err
|
||||
@@ -628,11 +617,8 @@ func (self *ApierV1) AddTriggeredAction(attr AttrAddActionTrigger, reply *string
|
||||
}
|
||||
acnt.ActionTriggers = append(acnt.ActionTriggers, at)
|
||||
|
||||
if err = self.DataManager.DataDB().SetAccount(acnt); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, acntID)
|
||||
return 0, self.DataManager.DataDB().SetAccount(acnt)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, acntID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -189,11 +190,8 @@ func (self *ApierV2) SetAccount(attr AttrSetAccount, reply *string) error {
|
||||
if err := self.DataManager.DataDB().SetAccountActionPlans(accID, acntAPids, true); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if err = self.DataManager.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, utils.ACTION_PLAN_PREFIX)
|
||||
return 0, self.DataManager.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ACTION_PLAN_PREFIX)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@@ -231,11 +229,8 @@ func (self *ApierV2) SetAccount(attr AttrSetAccount, reply *string) error {
|
||||
ub.Disabled = *attr.Disabled
|
||||
}
|
||||
// All prepared, save account
|
||||
if err := self.DataManager.DataDB().SetAccount(ub); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, accID)
|
||||
return 0, self.DataManager.DataDB().SetAccount(ub)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, accID)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
"strings"
|
||||
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -72,11 +73,8 @@ func (self *ApierV2) LoadAccountActions(attrs AttrLoadAccountActions, reply *str
|
||||
tpAa := &utils.TPAccountActions{TPid: attrs.TPid}
|
||||
tpAa.SetAccountActionsId(attrs.AccountActionsId)
|
||||
if _, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
if err := dbReader.LoadAccountActionsFiltered(tpAa); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, attrs.AccountActionsId); err != nil {
|
||||
return 0, dbReader.LoadAccountActionsFiltered(tpAa)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, attrs.AccountActionsId); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
sched := self.ServManager.GetScheduler()
|
||||
|
||||
@@ -21,6 +21,7 @@ package v2
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -186,11 +187,8 @@ func (self *ApierV2) SetAccountActionTriggers(attr AttrSetAccountActionTriggers,
|
||||
}
|
||||
}
|
||||
account.ExecuteActionTriggers(nil)
|
||||
if err := self.DataManager.DataDB().SetAccount(account); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, accID)
|
||||
return 0, self.DataManager.DataDB().SetAccount(account)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, accID)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
return err
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/structmatcher"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -172,7 +173,7 @@ func (acc *Account) setBalanceAction(a *Action) error {
|
||||
i++
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, balance.SharedGroups.Slice()...)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, balance.SharedGroups.Slice()...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -264,7 +265,7 @@ func (ub *Account) debitBalanceAction(a *Action, reset, resetIfNegative bool) er
|
||||
}
|
||||
dm.CacheDataFromDB(utils.SHARED_GROUP_PREFIX, sgs, true)
|
||||
return 0, nil
|
||||
}, 0, bClone.SharedGroups.Slice()...)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, bClone.SharedGroups.Slice()...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -565,7 +565,7 @@ func removeAccountAction(ub *Account, a *Action, acs Actions, extraData interfac
|
||||
}
|
||||
return 0, nil
|
||||
|
||||
}, 0, utils.ACTION_PLAN_PREFIX)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ACTION_PLAN_PREFIX)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -353,7 +353,7 @@ func (at *ActionTiming) Execute(successActions, failedActions chan *Action) (err
|
||||
dm.DataDB().SetAccount(acc)
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, accID)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, accID)
|
||||
}
|
||||
if len(at.accountIDs) == 0 { // action timing executing without accounts
|
||||
for _, a := range aac {
|
||||
|
||||
@@ -761,9 +761,9 @@ func (cd *CallDescriptor) GetMaxSessionDuration() (duration time.Duration, err e
|
||||
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
|
||||
duration, err = cd.getMaxSessionDuration(account)
|
||||
return
|
||||
}, 0, lkIDs...)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, lkIDs...)
|
||||
return
|
||||
}, 0, utils.ACCOUNT_PREFIX+cd.GetAccountKey())
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ACCOUNT_PREFIX+cd.GetAccountKey())
|
||||
return
|
||||
}
|
||||
|
||||
@@ -837,9 +837,9 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
|
||||
cc.AccountSummary = cd.AccountSummary()
|
||||
}
|
||||
return
|
||||
}, 0, lkIDs...)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, lkIDs...)
|
||||
return
|
||||
}, 0, utils.ACCOUNT_PREFIX+cd.GetAccountKey())
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ACCOUNT_PREFIX+cd.GetAccountKey())
|
||||
return
|
||||
}
|
||||
|
||||
@@ -906,9 +906,9 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) {
|
||||
}
|
||||
//log.Print(balanceMap[0].Value, balanceMap[1].Value)
|
||||
return
|
||||
}, 0, lkIDs...)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, lkIDs...)
|
||||
return
|
||||
}, 0, utils.ACCOUNT_PREFIX+cd.GetAccountKey())
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ACCOUNT_PREFIX+cd.GetAccountKey())
|
||||
return cc, err
|
||||
}
|
||||
|
||||
@@ -971,7 +971,7 @@ func (cd *CallDescriptor) RefundIncrements() (acnt *Account, err error) {
|
||||
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
|
||||
acnt, err = cd.refundIncrements()
|
||||
return
|
||||
}, 0, accMap.Slice()...)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, accMap.Slice()...)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1014,7 +1014,7 @@ func (cd *CallDescriptor) RefundRounding() (err error) {
|
||||
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
|
||||
err = cd.refundRounding()
|
||||
return
|
||||
}, 0, accMap.Slice()...)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, accMap.Slice()...)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -138,11 +138,9 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer fileOut.Close()
|
||||
if _, err := fileOut.Write(body); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, nil
|
||||
_, err = fileOut.Write(body)
|
||||
fileOut.Close()
|
||||
return nil, err
|
||||
}, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath)
|
||||
}
|
||||
return
|
||||
@@ -348,11 +346,9 @@ func (pstr *AMQPPoster) writeToFile(fileName string, content []byte) (err error)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer fileOut.Close()
|
||||
if _, err := fileOut.Write(content); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, nil
|
||||
_, err = fileOut.Write(content)
|
||||
fileOut.Close()
|
||||
return nil, err
|
||||
}, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -100,7 +100,7 @@ func (rs *Responder) GetCost(arg *CallDescriptor, reply *CallCost) (err error) {
|
||||
}
|
||||
r, e := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
return arg.GetCost()
|
||||
}, 0, arg.GetAccountKey())
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, arg.GetAccountKey())
|
||||
if r != nil {
|
||||
*reply = *r.(*CallCost)
|
||||
}
|
||||
|
||||
@@ -1505,7 +1505,7 @@ func (ms *MongoStorage) AddLoadHistory(ldInst *utils.LoadInstance,
|
||||
)
|
||||
return err
|
||||
})
|
||||
}, 0, utils.LOADINST_KEY)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.LOADINST_KEY)
|
||||
|
||||
Cache.Remove(utils.LOADINST_KEY, "",
|
||||
cacheCommit(transactionID), transactionID)
|
||||
|
||||
@@ -991,9 +991,8 @@ func (rs *RedisStorage) AddLoadHistory(ldInst *utils.LoadInstance, loadHistSize
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
err = rs.Cmd("LPUSH", utils.LOADINST_KEY, marshaled).Err
|
||||
return nil, err
|
||||
}, 0, utils.LOADINST_KEY)
|
||||
return nil, rs.Cmd("LPUSH", utils.LOADINST_KEY, marshaled).Err
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.LOADINST_KEY)
|
||||
|
||||
Cache.Remove(utils.LOADINST_KEY, "",
|
||||
cacheCommit(transactionID), transactionID)
|
||||
|
||||
Reference in New Issue
Block a user