mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
Updated guardian
This commit is contained in:
committed by
Dan Christian Bogos
parent
fa3eacbdec
commit
4555789d53
@@ -46,21 +46,21 @@ func (apierSv1 *APIerSv1) GetAccountActionPlan(attrs *utils.TenantAccount, reply
|
||||
tnt = apierSv1.Config.GeneralCfg().DefaultTenant
|
||||
}
|
||||
acntID := utils.ConcatenatedKey(tnt, attrs.Account)
|
||||
acntATsIf, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
accountATs := make([]*AccountActionTiming, 0) // needs to be initialized if remains empty
|
||||
if err := guardian.Guardian.Guard(func() error {
|
||||
acntAPids, err := apierSv1.DataManager.GetAccountActionPlans(acntID, true, true, utils.NonTransactional)
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
return nil, utils.NewErrServerError(err)
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
var acntAPs []*engine.ActionPlan
|
||||
for _, apID := range acntAPids {
|
||||
if ap, err := apierSv1.DataManager.GetActionPlan(apID, true, true, utils.NonTransactional); err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
} else if ap != nil {
|
||||
acntAPs = append(acntAPs, ap)
|
||||
}
|
||||
}
|
||||
|
||||
accountATs := make([]*AccountActionTiming, 0) // needs to be initialized if remains empty
|
||||
for _, ap := range acntAPs {
|
||||
for _, at := range ap.ActionTimings {
|
||||
accountATs = append(accountATs, &AccountActionTiming{
|
||||
@@ -71,12 +71,11 @@ func (apierSv1 *APIerSv1) GetAccountActionPlan(attrs *utils.TenantAccount, reply
|
||||
})
|
||||
}
|
||||
}
|
||||
return accountATs, nil
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ActionPlanPrefix)
|
||||
if err != nil {
|
||||
return nil
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ActionPlanPrefix); err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = acntATsIf.([]*AccountActionTiming)
|
||||
*reply = accountATs
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -106,12 +105,12 @@ func (apierSv1 *APIerSv1) RemoveActionTiming(attrs *AttrRemoveActionTiming, repl
|
||||
}
|
||||
|
||||
var remAcntAPids []string // list of accounts who's indexes need modification
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
if err = guardian.Guardian.Guard(func() error {
|
||||
ap, err := apierSv1.DataManager.GetActionPlan(attrs.ActionPlanId, true, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
} else if ap == nil {
|
||||
return 0, utils.ErrNotFound
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
if accID != "" {
|
||||
delete(ap.AccountIDs, accID)
|
||||
@@ -141,17 +140,17 @@ func (apierSv1 *APIerSv1) RemoveActionTiming(attrs *AttrRemoveActionTiming, repl
|
||||
|
||||
UPDATE:
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
if err := apierSv1.ConnMgr.Call(apierSv1.Config.ApierCfg().CachesConns, nil,
|
||||
utils.CacheSv1ReloadCache, &utils.AttrReloadCacheWithAPIOpts{
|
||||
ArgsCache: map[string][]string{utils.ActionPlanIDs: {attrs.ActionPlanId}},
|
||||
}, reply); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
for _, acntID := range remAcntAPids {
|
||||
if err = apierSv1.DataManager.RemAccountActionPlans(acntID, []string{attrs.ActionPlanId}); err != nil {
|
||||
return 0, nil
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if len(remAcntAPids) != 0 {
|
||||
@@ -159,12 +158,11 @@ func (apierSv1 *APIerSv1) RemoveActionTiming(attrs *AttrRemoveActionTiming, repl
|
||||
utils.CacheSv1ReloadCache, &utils.AttrReloadCacheWithAPIOpts{
|
||||
ArgsCache: map[string][]string{utils.AccountActionPlanIDs: remAcntAPids},
|
||||
}, reply); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
}
|
||||
return 0, nil
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ActionPlanPrefix)
|
||||
if err != nil {
|
||||
return nil
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ActionPlanPrefix); err != nil {
|
||||
*reply = err.Error()
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
@@ -190,7 +188,7 @@ func (apierSv1 *APIerSv1) SetAccount(attr *utils.AttrSetAccount, reply *string)
|
||||
}
|
||||
accID := utils.ConcatenatedKey(tnt, attr.Account)
|
||||
dirtyActionPlans := make(map[string]*engine.ActionPlan)
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
if err = guardian.Guardian.Guard(func() error {
|
||||
var ub *engine.Account
|
||||
if bal, _ := apierSv1.DataManager.GetAccount(accID); bal != nil {
|
||||
ub = bal
|
||||
@@ -200,10 +198,10 @@ func (apierSv1 *APIerSv1) SetAccount(attr *utils.AttrSetAccount, reply *string)
|
||||
}
|
||||
}
|
||||
if attr.ActionPlanID != "" {
|
||||
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
if err := guardian.Guardian.Guard(func() error {
|
||||
acntAPids, err := apierSv1.DataManager.GetAccountActionPlans(accID, true, true, utils.NonTransactional)
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
// clean previous action plans
|
||||
for i := 0; i < len(acntAPids); {
|
||||
@@ -214,7 +212,7 @@ func (apierSv1 *APIerSv1) SetAccount(attr *utils.AttrSetAccount, reply *string)
|
||||
}
|
||||
ap, err := apierSv1.DataManager.GetActionPlan(apID, true, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
delete(ap.AccountIDs, accID)
|
||||
dirtyActionPlans[apID] = ap
|
||||
@@ -223,7 +221,7 @@ func (apierSv1 *APIerSv1) SetAccount(attr *utils.AttrSetAccount, reply *string)
|
||||
if !utils.IsSliceMember(acntAPids, attr.ActionPlanID) { // Account not yet attached to action plan, do it here
|
||||
ap, err := apierSv1.DataManager.GetActionPlan(attr.ActionPlanID, true, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
if ap.AccountIDs == nil {
|
||||
ap.AccountIDs = make(utils.StringMap)
|
||||
@@ -240,7 +238,7 @@ func (apierSv1 *APIerSv1) SetAccount(attr *utils.AttrSetAccount, reply *string)
|
||||
ActionsID: at.ActionsID,
|
||||
}
|
||||
if err = apierSv1.DataManager.DataDB().PushTask(t); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -249,31 +247,27 @@ func (apierSv1 *APIerSv1) SetAccount(attr *utils.AttrSetAccount, reply *string)
|
||||
i := 0
|
||||
for actionPlanID, ap := range dirtyActionPlans {
|
||||
if err := apierSv1.DataManager.SetActionPlan(actionPlanID, ap, true, utils.NonTransactional); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
apIDs[i] = actionPlanID
|
||||
i++
|
||||
}
|
||||
if err := apierSv1.DataManager.SetAccountActionPlans(accID, acntAPids, true); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
if err := apierSv1.ConnMgr.Call(apierSv1.Config.ApierCfg().CachesConns, nil,
|
||||
return apierSv1.ConnMgr.Call(apierSv1.Config.ApierCfg().CachesConns, nil,
|
||||
utils.CacheSv1ReloadCache, &utils.AttrReloadCacheWithAPIOpts{
|
||||
ArgsCache: map[string][]string{utils.AccountActionPlanIDs: {accID}, utils.ActionPlanIDs: apIDs},
|
||||
}, reply); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ActionPlanPrefix)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}, reply)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ActionPlanPrefix); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if attr.ActionTriggersID != "" {
|
||||
atrs, err := apierSv1.DataManager.GetActionTriggers(attr.ActionTriggersID, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
ub.ActionTriggers = atrs
|
||||
ub.InitCounters()
|
||||
@@ -286,12 +280,8 @@ func (apierSv1 *APIerSv1) SetAccount(attr *utils.AttrSetAccount, reply *string)
|
||||
ub.Disabled = dis
|
||||
}
|
||||
// All prepared, save account
|
||||
if err := apierSv1.DataManager.SetAccount(ub); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.AccountPrefix+accID)
|
||||
if err != nil {
|
||||
return apierSv1.DataManager.SetAccount(ub)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.AccountPrefix+accID); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if attr.ReloadScheduler && len(dirtyActionPlans) != 0 {
|
||||
@@ -315,16 +305,16 @@ func (apierSv1 *APIerSv1) RemoveAccount(attr *utils.AttrRemoveAccount, reply *st
|
||||
}
|
||||
dirtyActionPlans := make(map[string]*engine.ActionPlan)
|
||||
accID := utils.ConcatenatedKey(tnt, attr.Account)
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
if err = guardian.Guardian.Guard(func() error {
|
||||
// remove it from all action plans
|
||||
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
if err := guardian.Guardian.Guard(func() error {
|
||||
actionPlansMap, err := apierSv1.DataManager.GetAllActionPlans()
|
||||
if err == utils.ErrNotFound {
|
||||
// no action plans
|
||||
return 0, nil
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
|
||||
for actionPlanID, ap := range actionPlansMap {
|
||||
@@ -337,20 +327,15 @@ func (apierSv1 *APIerSv1) RemoveAccount(attr *utils.AttrRemoveAccount, reply *st
|
||||
for actionPlanID, ap := range dirtyActionPlans {
|
||||
if err := apierSv1.DataManager.SetActionPlan(actionPlanID, ap, true,
|
||||
utils.NonTransactional); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
}
|
||||
return 0, nil
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ActionPlanPrefix)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return nil
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ActionPlanPrefix); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := apierSv1.DataManager.RemoveAccount(accID); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.AccountPrefix+accID)
|
||||
if err != nil {
|
||||
return apierSv1.DataManager.RemoveAccount(accID)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.AccountPrefix+accID); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if err = apierSv1.DataManager.RemAccountActionPlans(accID, nil); err != nil &&
|
||||
|
||||
@@ -765,12 +765,12 @@ func (apierSv1 *APIerSv1) SetActionPlan(attrs *AttrSetActionPlan, reply *string)
|
||||
return fmt.Errorf("%s:Action:%s:%v", utils.ErrMandatoryIeMissing.Error(), at.ActionsId, missing)
|
||||
}
|
||||
}
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
err = guardian.Guardian.Guard(func() error {
|
||||
var prevAccountIDs utils.StringMap
|
||||
if prevAP, err := apierSv1.DataManager.GetActionPlan(attrs.Id, true, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound {
|
||||
return 0, utils.NewErrServerError(err)
|
||||
return utils.NewErrServerError(err)
|
||||
} else if err == nil && !attrs.Overwrite {
|
||||
return 0, utils.ErrExists
|
||||
return utils.ErrExists
|
||||
} else if prevAP != nil {
|
||||
prevAccountIDs = prevAP.AccountIDs
|
||||
}
|
||||
@@ -779,13 +779,13 @@ func (apierSv1 *APIerSv1) SetActionPlan(attrs *AttrSetActionPlan, reply *string)
|
||||
}
|
||||
for _, apiAtm := range attrs.ActionPlan {
|
||||
if exists, err := apierSv1.DataManager.HasData(utils.ActionPrefix, apiAtm.ActionsId, ""); err != nil {
|
||||
return 0, utils.NewErrServerError(err)
|
||||
return utils.NewErrServerError(err)
|
||||
} else if !exists {
|
||||
return 0, fmt.Errorf("%s:%s", utils.ErrBrokenReference.Error(), apiAtm.ActionsId)
|
||||
return fmt.Errorf("%s:%s", utils.ErrBrokenReference.Error(), apiAtm.ActionsId)
|
||||
}
|
||||
timing, err := apiAtm.getRITiming(apierSv1.DataManager)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
ap.ActionTimings = append(ap.ActionTimings, &engine.ActionTiming{
|
||||
Uuid: utils.GenUUID(),
|
||||
@@ -795,17 +795,17 @@ func (apierSv1 *APIerSv1) SetActionPlan(attrs *AttrSetActionPlan, reply *string)
|
||||
})
|
||||
}
|
||||
if err := apierSv1.DataManager.SetActionPlan(ap.Id, ap, true, utils.NonTransactional); err != nil {
|
||||
return 0, utils.NewErrServerError(err)
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if err := apierSv1.ConnMgr.Call(apierSv1.Config.ApierCfg().CachesConns, nil,
|
||||
utils.CacheSv1ReloadCache, &utils.AttrReloadCacheWithAPIOpts{
|
||||
ArgsCache: map[string][]string{utils.ActionPlanIDs: {ap.Id}},
|
||||
}, reply); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
for acntID := range prevAccountIDs {
|
||||
if err := apierSv1.DataManager.RemAccountActionPlans(acntID, []string{attrs.Id}); err != nil {
|
||||
return 0, utils.NewErrServerError(err)
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
}
|
||||
if len(prevAccountIDs) != 0 {
|
||||
@@ -814,10 +814,10 @@ func (apierSv1 *APIerSv1) SetActionPlan(attrs *AttrSetActionPlan, reply *string)
|
||||
utils.CacheSv1ReloadCache, &utils.AttrReloadCacheWithAPIOpts{
|
||||
ArgsCache: map[string][]string{utils.AccountActionPlanIDs: sl},
|
||||
}, reply); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
}
|
||||
return 0, nil
|
||||
return nil
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ActionPlanPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -979,19 +979,19 @@ func (apierSv1 *APIerSv1) RemoveActionPlan(attr *AttrGetActionPlan, reply *strin
|
||||
if missing := utils.MissingStructFields(attr, []string{"ID"}); len(missing) != 0 {
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
if _, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
if err = guardian.Guardian.Guard(func() error {
|
||||
var prevAccountIDs utils.StringMap
|
||||
if prevAP, err := apierSv1.DataManager.GetActionPlan(attr.ID, true, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound {
|
||||
return 0, err
|
||||
return err
|
||||
} else if prevAP != nil {
|
||||
prevAccountIDs = prevAP.AccountIDs
|
||||
}
|
||||
if err := apierSv1.DataManager.RemoveActionPlan(attr.ID, utils.NonTransactional); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
for acntID := range prevAccountIDs {
|
||||
if err := apierSv1.DataManager.RemAccountActionPlans(acntID, []string{attr.ID}); err != nil {
|
||||
return 0, utils.NewErrServerError(err)
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
}
|
||||
if len(prevAccountIDs) != 0 {
|
||||
@@ -1000,10 +1000,10 @@ func (apierSv1 *APIerSv1) RemoveActionPlan(attr *AttrGetActionPlan, reply *strin
|
||||
utils.CacheSv1ReloadCache, &utils.AttrReloadCacheWithAPIOpts{
|
||||
ArgsCache: map[string][]string{utils.AccountActionPlanIDs: sl},
|
||||
}, reply); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
}
|
||||
return 0, nil
|
||||
return nil
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ActionPlanPrefix); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1023,8 +1023,8 @@ func (apierSv1 *APIerSv1) LoadAccountActions(attrs *utils.TPAccountActions, repl
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if _, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
return 0, dbReader.LoadAccountActionsFiltered(attrs)
|
||||
if err := guardian.Guardian.Guard(func() error {
|
||||
return dbReader.LoadAccountActionsFiltered(attrs)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, attrs.LoadId); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
@@ -1254,8 +1254,8 @@ func (apierSv1 *APIerSv1) RemoveRatingProfile(attr *AttrRemoveRatingProfile, rep
|
||||
(attr.Category != "" && attr.Tenant == "") {
|
||||
return utils.ErrMandatoryIeMissing
|
||||
}
|
||||
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
return 0, apierSv1.DataManager.RemoveRatingProfile(attr.GetId())
|
||||
err := guardian.Guardian.Guard(func() error {
|
||||
return apierSv1.DataManager.RemoveRatingProfile(attr.GetId())
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, "RemoveRatingProfile")
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
|
||||
@@ -74,9 +74,9 @@ func (apierSv1 *APIerSv1) AddAccountActionTriggers(attr *AttrAddAccountActionTri
|
||||
}
|
||||
accID := utils.ConcatenatedKey(tnt, attr.Account)
|
||||
var account *engine.Account
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
err = guardian.Guardian.Guard(func() error {
|
||||
if account, err = apierSv1.DataManager.GetAccount(accID); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
if attr.ActionTriggerOverwrite {
|
||||
account.ActionTriggers = make(engine.ActionTriggers, 0)
|
||||
@@ -84,7 +84,7 @@ func (apierSv1 *APIerSv1) AddAccountActionTriggers(attr *AttrAddAccountActionTri
|
||||
for _, actionTriggerID := range attr.ActionTriggerIDs {
|
||||
atrs, err := apierSv1.DataManager.GetActionTriggers(actionTriggerID, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
for _, at := range atrs {
|
||||
var found bool
|
||||
@@ -102,7 +102,7 @@ func (apierSv1 *APIerSv1) AddAccountActionTriggers(attr *AttrAddAccountActionTri
|
||||
}
|
||||
}
|
||||
account.InitCounters()
|
||||
return 0, apierSv1.DataManager.SetAccount(account)
|
||||
return apierSv1.DataManager.SetAccount(account)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.AccountPrefix+accID)
|
||||
if err != nil {
|
||||
return
|
||||
@@ -127,12 +127,12 @@ func (apierSv1 *APIerSv1) RemoveAccountActionTriggers(attr *AttrRemoveAccountAct
|
||||
tnt = apierSv1.Config.GeneralCfg().DefaultTenant
|
||||
}
|
||||
accID := utils.ConcatenatedKey(tnt, attr.Account)
|
||||
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
err := guardian.Guardian.Guard(func() error {
|
||||
var account *engine.Account
|
||||
if acc, err := apierSv1.DataManager.GetAccount(accID); err == nil {
|
||||
account = acc
|
||||
} else {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
var newActionTriggers engine.ActionTriggers
|
||||
for _, at := range account.ActionTriggers {
|
||||
@@ -145,7 +145,7 @@ func (apierSv1 *APIerSv1) RemoveAccountActionTriggers(attr *AttrRemoveAccountAct
|
||||
}
|
||||
account.ActionTriggers = newActionTriggers
|
||||
account.InitCounters()
|
||||
return 0, apierSv1.DataManager.SetAccount(account)
|
||||
return apierSv1.DataManager.SetAccount(account)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, accID)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
@@ -173,11 +173,11 @@ func (apierSv1 *APIerSv1) ResetAccountActionTriggers(attr *AttrResetAccountActio
|
||||
}
|
||||
accID := utils.ConcatenatedKey(tnt, attr.Account)
|
||||
var account *engine.Account
|
||||
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
err := guardian.Guardian.Guard(func() error {
|
||||
if acc, err := apierSv1.DataManager.GetAccount(accID); err == nil {
|
||||
account = acc
|
||||
} else {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
for _, at := range account.ActionTriggers {
|
||||
if (attr.UniqueID == "" || at.UniqueID == attr.UniqueID) &&
|
||||
@@ -190,7 +190,7 @@ func (apierSv1 *APIerSv1) ResetAccountActionTriggers(attr *AttrResetAccountActio
|
||||
if attr.Executed == false {
|
||||
account.ExecuteActionTriggers(nil)
|
||||
}
|
||||
return 0, apierSv1.DataManager.SetAccount(account)
|
||||
return apierSv1.DataManager.SetAccount(account)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.AccountPrefix+accID)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
@@ -361,17 +361,17 @@ func (apierSv1 *APIerSv1) SetAccountActionTriggers(attr *AttrSetAccountActionTri
|
||||
}
|
||||
accID := utils.ConcatenatedKey(tnt, attr.Account)
|
||||
var account *engine.Account
|
||||
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
err := guardian.Guardian.Guard(func() error {
|
||||
if acc, err := apierSv1.DataManager.GetAccount(accID); err == nil {
|
||||
account = acc
|
||||
} else {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
var foundOne bool
|
||||
for _, at := range account.ActionTriggers {
|
||||
if updated, err := attr.UpdateActionTrigger(at,
|
||||
apierSv1.Config.GeneralCfg().DefaultTimezone); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
} else if updated && !foundOne {
|
||||
foundOne = true
|
||||
}
|
||||
@@ -380,13 +380,13 @@ func (apierSv1 *APIerSv1) SetAccountActionTriggers(attr *AttrSetAccountActionTri
|
||||
at := new(engine.ActionTrigger)
|
||||
if updated, err := attr.UpdateActionTrigger(at,
|
||||
apierSv1.Config.GeneralCfg().DefaultTimezone); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
} else if updated { // Adding a new AT
|
||||
account.ActionTriggers = append(account.ActionTriggers, at)
|
||||
}
|
||||
}
|
||||
account.ExecuteActionTriggers(nil)
|
||||
return 0, apierSv1.DataManager.SetAccount(account)
|
||||
return apierSv1.DataManager.SetAccount(account)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.AccountPrefix+accID)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
@@ -587,14 +587,14 @@ func (apierSv1 *APIerSv1) AddTriggeredAction(attr AttrAddActionTrigger, reply *s
|
||||
at.Balance.SharedGroups = &utils.StringMap{attr.BalanceSharedGroup: true}
|
||||
}
|
||||
acntID := utils.ConcatenatedKey(tnt, attr.Account)
|
||||
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
err := guardian.Guardian.Guard(func() error {
|
||||
acnt, err := apierSv1.DataManager.GetAccount(acntID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
acnt.ActionTriggers = append(acnt.ActionTriggers, at)
|
||||
|
||||
return 0, apierSv1.DataManager.SetAccount(acnt)
|
||||
return apierSv1.DataManager.SetAccount(acnt)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.AccountPrefix+acntID)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -138,7 +138,7 @@ func (apiv2 *APIerSv2) SetAccount(attr *AttrSetAccount, reply *string) error {
|
||||
dirtyActionPlans := make(map[string]*engine.ActionPlan)
|
||||
var ub *engine.Account
|
||||
var schedNeedsReload bool
|
||||
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
err := guardian.Guardian.Guard(func() error {
|
||||
if bal, _ := apiv2.DataManager.GetAccount(accID); bal != nil {
|
||||
ub = bal
|
||||
} else { // Not found in db, create it here
|
||||
@@ -146,10 +146,10 @@ func (apiv2 *APIerSv2) SetAccount(attr *AttrSetAccount, reply *string) error {
|
||||
ID: accID,
|
||||
}
|
||||
}
|
||||
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
err := guardian.Guardian.Guard(func() error {
|
||||
acntAPids, err := apiv2.DataManager.GetAccountActionPlans(accID, true, true, utils.NonTransactional)
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
if attr.ActionPlansOverwrite {
|
||||
// clean previous action plans
|
||||
@@ -161,7 +161,7 @@ func (apiv2 *APIerSv2) SetAccount(attr *AttrSetAccount, reply *string) error {
|
||||
}
|
||||
ap, err := apiv2.DataManager.GetActionPlan(apID, true, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
delete(ap.AccountIDs, accID)
|
||||
dirtyActionPlans[apID] = ap
|
||||
@@ -171,7 +171,7 @@ func (apiv2 *APIerSv2) SetAccount(attr *AttrSetAccount, reply *string) error {
|
||||
for _, apID := range attr.ActionPlanIDs {
|
||||
ap, err := apiv2.DataManager.GetActionPlan(apID, true, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
// create tasks
|
||||
var schedTasks int // keep count on the number of scheduled tasks so we can compare with actions needed
|
||||
@@ -183,7 +183,7 @@ func (apiv2 *APIerSv2) SetAccount(attr *AttrSetAccount, reply *string) error {
|
||||
ActionsID: at.ActionsID,
|
||||
}
|
||||
if err = apiv2.DataManager.DataDB().PushTask(t); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
schedTasks++
|
||||
}
|
||||
@@ -208,20 +208,20 @@ func (apiv2 *APIerSv2) SetAccount(attr *AttrSetAccount, reply *string) error {
|
||||
apIDs := make([]string, 0, len(dirtyActionPlans))
|
||||
for actionPlanID, ap := range dirtyActionPlans {
|
||||
if err := apiv2.DataManager.SetActionPlan(actionPlanID, ap, true, utils.NonTransactional); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
apIDs = append(apIDs, actionPlanID)
|
||||
}
|
||||
if err := apiv2.DataManager.SetAccountActionPlans(accID, acntAPids, true); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
return 0, apiv2.ConnMgr.Call(apiv2.Config.ApierCfg().CachesConns, nil,
|
||||
return apiv2.ConnMgr.Call(apiv2.Config.ApierCfg().CachesConns, nil,
|
||||
utils.CacheSv1ReloadCache, &utils.AttrReloadCacheWithAPIOpts{
|
||||
ArgsCache: map[string][]string{utils.AccountActionPlanIDs: {accID}, utils.ActionPlanIDs: apIDs},
|
||||
}, reply)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ActionPlanPrefix)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
|
||||
if attr.ActionTriggerOverwrite {
|
||||
@@ -230,7 +230,7 @@ func (apiv2 *APIerSv2) SetAccount(attr *AttrSetAccount, reply *string) error {
|
||||
for _, actionTriggerID := range attr.ActionTriggerIDs {
|
||||
atrs, err := apiv2.DataManager.GetActionTriggers(actionTriggerID, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
for _, at := range atrs {
|
||||
var found bool
|
||||
@@ -254,7 +254,7 @@ func (apiv2 *APIerSv2) SetAccount(attr *AttrSetAccount, reply *string) error {
|
||||
ub.Disabled = dis
|
||||
}
|
||||
// All prepared, save account
|
||||
return 0, apiv2.DataManager.SetAccount(ub)
|
||||
return apiv2.DataManager.SetAccount(ub)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.AccountPrefix+accID)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
|
||||
@@ -93,8 +93,8 @@ func (apiv2 *APIerSv2) LoadAccountActions(attrs *AttrLoadAccountActions, reply *
|
||||
}
|
||||
tpAa := &utils.TPAccountActions{TPid: attrs.TPid}
|
||||
tpAa.SetAccountActionsId(attrs.AccountActionsId)
|
||||
if _, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
return 0, dbReader.LoadAccountActionsFiltered(tpAa)
|
||||
if err := guardian.Guardian.Guard(func() error {
|
||||
return dbReader.LoadAccountActionsFiltered(tpAa)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, attrs.AccountActionsId); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
|
||||
@@ -156,7 +156,7 @@ func (acc *Account) setBalanceAction(a *Action) error {
|
||||
}
|
||||
// modify if necessary the shared groups here
|
||||
if !found || !previousSharedGroups.Equal(balance.SharedGroups) {
|
||||
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
err := guardian.Guardian.Guard(func() error {
|
||||
i := 0
|
||||
for sgID := range balance.SharedGroups {
|
||||
// add shared group member
|
||||
@@ -176,7 +176,7 @@ func (acc *Account) setBalanceAction(a *Action) error {
|
||||
}
|
||||
i++
|
||||
}
|
||||
return 0, nil
|
||||
return nil
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, balance.SharedGroups.Slice()...)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -246,7 +246,7 @@ func (acc *Account) debitBalanceAction(a *Action, reset, resetIfNegative bool) e
|
||||
}
|
||||
}
|
||||
acc.BalanceMap[balanceType] = append(acc.BalanceMap[balanceType], bClone)
|
||||
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
err := guardian.Guardian.Guard(func() error {
|
||||
sgs := make([]string, len(bClone.SharedGroups))
|
||||
i := 0
|
||||
for sgID := range bClone.SharedGroups {
|
||||
@@ -268,7 +268,7 @@ func (acc *Account) debitBalanceAction(a *Action, reset, resetIfNegative bool) e
|
||||
i++
|
||||
}
|
||||
dm.CacheDataFromDB(utils.SharedGroupPrefix, sgs, true)
|
||||
return 0, nil
|
||||
return nil
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, bClone.SharedGroups.Slice()...)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -555,40 +555,36 @@ func removeAccountAction(ub *Account, a *Action, acs Actions, extraData interfac
|
||||
return err
|
||||
}
|
||||
|
||||
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
return guardian.Guardian.Guard(func() error {
|
||||
acntAPids, err := dm.GetAccountActionPlans(accID, true, true, utils.NonTransactional)
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
utils.Logger.Err(fmt.Sprintf("Could not get action plans: %s: %v", accID, err))
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
for _, apID := range acntAPids {
|
||||
ap, err := dm.GetActionPlan(apID, true, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("Could not retrieve action plan: %s: %v", apID, err))
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
delete(ap.AccountIDs, accID)
|
||||
if err := dm.SetActionPlan(apID, ap, true, utils.NonTransactional); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("Could not save action plan: %s: %v", apID, err))
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err = dm.CacheDataFromDB(utils.ActionPlanPrefix, acntAPids, true); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
if err = dm.RemAccountActionPlans(accID, nil); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
if err = dm.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true); err != nil && err.Error() != utils.ErrNotFound.Error() {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
return 0, nil
|
||||
return nil
|
||||
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ActionPlanPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func removeBalanceAction(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
|
||||
@@ -202,12 +202,12 @@ func (at *ActionTiming) Execute(successActions, failedActions chan *Action) (err
|
||||
}
|
||||
var partialyExecuted bool
|
||||
for accID := range at.accountIDs {
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
err = guardian.Guardian.Guard(func() error {
|
||||
acc, err := dm.GetAccount(accID)
|
||||
if err != nil { // create account
|
||||
if err != utils.ErrNotFound {
|
||||
utils.Logger.Warning(fmt.Sprintf("Could not get account id: %s. Skipping!", accID))
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
err = nil
|
||||
acc = &Account{
|
||||
@@ -222,7 +222,7 @@ func (at *ActionTiming) Execute(successActions, failedActions chan *Action) (err
|
||||
matched, err := acc.matchActionFilter(a.Filter)
|
||||
//log.Print("Checkng: ", a.Filter, matched)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
if !matched {
|
||||
continue
|
||||
@@ -270,7 +270,7 @@ func (at *ActionTiming) Execute(successActions, failedActions chan *Action) (err
|
||||
if !transactionFailed && !removeAccountActionFound {
|
||||
dm.SetAccount(acc)
|
||||
}
|
||||
return 0, nil
|
||||
return nil
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.AccountPrefix+accID)
|
||||
}
|
||||
//reset the error in case that the account is not found
|
||||
|
||||
@@ -682,14 +682,14 @@ func (origCD *CallDescriptor) getMaxSessionDuration(origAcc *Account) (time.Dura
|
||||
|
||||
func (cd *CallDescriptor) GetMaxSessionDuration() (duration time.Duration, err error) {
|
||||
cd.account = nil // make sure it's not cached
|
||||
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
|
||||
err = guardian.Guardian.Guard(func() (_ error) {
|
||||
account, err := cd.getAccount()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
acntIDs, err := account.GetUniqueSharedGroupMembers(cd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
var lkIDs []string
|
||||
for acntID := range acntIDs {
|
||||
@@ -697,11 +697,10 @@ func (cd *CallDescriptor) GetMaxSessionDuration() (duration time.Duration, err e
|
||||
lkIDs = append(lkIDs, utils.AccountPrefix+acntID)
|
||||
}
|
||||
}
|
||||
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
|
||||
return guardian.Guardian.Guard(func() error {
|
||||
duration, err = cd.getMaxSessionDuration(account)
|
||||
return
|
||||
return err
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, lkIDs...)
|
||||
return
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.AccountPrefix+cd.GetAccountKey())
|
||||
return
|
||||
}
|
||||
@@ -754,16 +753,16 @@ func (cd *CallDescriptor) debit(account *Account, dryRun bool, goNegative bool)
|
||||
|
||||
func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
|
||||
cd.account = nil // make sure it's not cached
|
||||
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
|
||||
err = guardian.Guardian.Guard(func() (_ error) {
|
||||
// lock all group members
|
||||
account, err := cd.getAccount()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
initialAcnt := account.AsAccountSummary()
|
||||
acntIDs, sgerr := account.GetUniqueSharedGroupMembers(cd)
|
||||
if sgerr != nil {
|
||||
return nil, sgerr
|
||||
return sgerr
|
||||
}
|
||||
var lkIDs []string
|
||||
for acntID := range acntIDs {
|
||||
@@ -771,12 +770,12 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
|
||||
lkIDs = append(lkIDs, utils.AccountPrefix+acntID)
|
||||
}
|
||||
}
|
||||
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
|
||||
return guardian.Guardian.Guard(func() (err error) {
|
||||
cc, err = cd.debit(account, cd.DryRun, !cd.DenyNegativeAccount)
|
||||
if err == nil {
|
||||
cc.AccountSummary = cd.AccountSummary(initialAcnt)
|
||||
}
|
||||
return
|
||||
return err
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, lkIDs...)
|
||||
return
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.AccountPrefix+cd.GetAccountKey())
|
||||
@@ -789,15 +788,15 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
|
||||
// by the GetMaxSessionDuration method. The amount filed has to be filled in call descriptor.
|
||||
func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) {
|
||||
cd.account = nil // make sure it's not cached
|
||||
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
|
||||
err = guardian.Guardian.Guard(func() (err error) {
|
||||
account, err := cd.getAccount()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
initialAcnt := account.AsAccountSummary()
|
||||
acntIDs, err := account.GetUniqueSharedGroupMembers(cd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
var lkIDs []string
|
||||
for acntID := range acntIDs {
|
||||
@@ -805,14 +804,14 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) {
|
||||
lkIDs = append(lkIDs, utils.AccountPrefix+acntID)
|
||||
}
|
||||
}
|
||||
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
|
||||
return guardian.Guardian.Guard(func() (err error) {
|
||||
remainingDuration, err := cd.getMaxSessionDuration(account)
|
||||
if err != nil && cd.GetDuration() > 0 {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
// check ForceDuartion
|
||||
if cd.ForceDuration && !account.AllowNegative && remainingDuration < cd.GetDuration() {
|
||||
return nil, utils.ErrInsufficientCredit
|
||||
return utils.ErrInsufficientCredit
|
||||
}
|
||||
if err != nil || remainingDuration == 0 {
|
||||
cc = cd.CreateCallCost()
|
||||
@@ -841,9 +840,8 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) {
|
||||
if err == nil {
|
||||
cc.AccountSummary = cd.AccountSummary(initialAcnt)
|
||||
}
|
||||
return
|
||||
return err
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, lkIDs...)
|
||||
return
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.AccountPrefix+cd.GetAccountKey())
|
||||
return cc, err
|
||||
}
|
||||
@@ -911,7 +909,7 @@ func (cd *CallDescriptor) RefundIncrements() (acnt *Account, err error) {
|
||||
accMap[utils.AccountPrefix+increment.BalanceInfo.AccountID] = true
|
||||
}
|
||||
}
|
||||
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
|
||||
guardian.Guardian.Guard(func() (_ error) {
|
||||
acnt, err = cd.refundIncrements()
|
||||
return
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, accMap.Slice()...)
|
||||
@@ -954,11 +952,9 @@ func (cd *CallDescriptor) RefundRounding() (err error) {
|
||||
for _, inc := range cd.Increments {
|
||||
accMap[utils.AccountPrefix+inc.BalanceInfo.AccountID] = true
|
||||
}
|
||||
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
|
||||
err = cd.refundRounding()
|
||||
return
|
||||
return guardian.Guardian.Guard(func() error {
|
||||
return cd.refundRounding()
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, accMap.Slice()...)
|
||||
return
|
||||
}
|
||||
|
||||
// Creates a CallCost structure copying related data from CallDescriptor
|
||||
|
||||
@@ -138,17 +138,16 @@ func (cdrS *CDRServer) storeSMCost(smCost *SMCost, checkDuplicate bool) error {
|
||||
smCost.CostDetails.Compute() // make sure the total cost reflect the increment
|
||||
lockKey := utils.MetaCDRs + smCost.CGRID + smCost.RunID + smCost.OriginID // Will lock on this ID
|
||||
if checkDuplicate {
|
||||
_, err := cdrS.guard.Guard(func() (interface{}, error) {
|
||||
return cdrS.guard.Guard(func() error {
|
||||
smCosts, err := cdrS.cdrDb.GetSMCosts(smCost.CGRID, smCost.RunID, "", "")
|
||||
if err != nil && err.Error() != utils.NotFoundCaps {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
if len(smCosts) != 0 {
|
||||
return nil, utils.ErrExists
|
||||
return utils.ErrExists
|
||||
}
|
||||
return nil, cdrS.cdrDb.SetSMCost(smCost)
|
||||
return cdrS.cdrDb.SetSMCost(smCost)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, lockKey) // FixMe: Possible deadlock with Guard from SMG session close()
|
||||
return err
|
||||
}
|
||||
return cdrS.cdrDb.SetSMCost(smCost)
|
||||
}
|
||||
|
||||
@@ -171,7 +171,7 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b
|
||||
_, err = dm.GetStatQueueProfile(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
case utils.StatQueuePrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
// guardian.Guardian.Guard(func() (_ interface{}, _ error) { // lock the get
|
||||
// guardian.Guardian.Guard(func() ( _ error) { // lock the get
|
||||
_, err = dm.GetStatQueue(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
// return
|
||||
// }, config.CgrConfig().GeneralCfg().LockingTimeout, utils.StatQueuePrefix+dataID)
|
||||
@@ -1179,7 +1179,7 @@ func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile, withIndex bool
|
||||
oldSts.TTL != sqp.TTL ||
|
||||
oldSts.MinItems != sqp.MinItems ||
|
||||
(oldSts.Stored != sqp.Stored && oldSts.Stored) { // reset the stats queue if the profile changed this fields
|
||||
guardian.Guardian.Guard(func() (_ interface{}, _ error) { // we change the queue so lock it
|
||||
guardian.Guardian.Guard(func() (_ error) { // we change the queue so lock it
|
||||
var sq *StatQueue
|
||||
if sq, err = NewStatQueue(sqp.Tenant, sqp.ID, sqp.Metrics,
|
||||
sqp.MinItems); err != nil {
|
||||
@@ -1189,7 +1189,7 @@ func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile, withIndex bool
|
||||
return
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.StatQueuePrefix+sqp.TenantID())
|
||||
} else {
|
||||
guardian.Guardian.Guard(func() (_ interface{}, _ error) { // we change the queue so lock it
|
||||
guardian.Guardian.Guard(func() (_ error) { // we change the queue so lock it
|
||||
oSq, errRs := dm.GetStatQueue(sqp.Tenant, sqp.ID, // do not try to get the stats queue if the configuration changed
|
||||
true, false, utils.NonTransactional)
|
||||
if errRs == utils.ErrNotFound { // the stats queue does not exist
|
||||
|
||||
@@ -36,7 +36,7 @@ func MatchingItemIDsForEvent(ev utils.MapStorage, stringFldIDs, prefixFldIDs, su
|
||||
}
|
||||
// Guard will protect the function with automatic locking
|
||||
lockID := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix
|
||||
guardian.Guardian.Guard(func() (_ interface{}, _ error) {
|
||||
guardian.Guardian.Guard(func() (_ error) {
|
||||
if !indexedSelects {
|
||||
var keysWithID []string
|
||||
if keysWithID, err = dm.DataDB().GetKeysForPrefix(utils.CacheIndexesToPrefix[cacheID]); err != nil {
|
||||
|
||||
@@ -88,13 +88,12 @@ func AddFailedPost(expPath, format, module string, ev interface{}, opts map[stri
|
||||
// used only on replay failed post
|
||||
func NewExportEventsFromFile(filePath string) (expEv *ExportEvents, err error) {
|
||||
var fileContent []byte
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
if err = guardian.Guardian.Guard(func() error {
|
||||
if fileContent, err = os.ReadFile(filePath); err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
return 0, os.Remove(filePath)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath)
|
||||
if err != nil {
|
||||
return os.Remove(filePath)
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath); err != nil {
|
||||
return
|
||||
}
|
||||
dec := gob.NewDecoder(bytes.NewBuffer(fileContent))
|
||||
@@ -126,17 +125,16 @@ func (expEv *ExportEvents) SetModule(mod string) {
|
||||
|
||||
// WriteToFile writes the events to file
|
||||
func (expEv *ExportEvents) WriteToFile(filePath string) (err error) {
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
return guardian.Guardian.Guard(func() error {
|
||||
fileOut, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
encd := gob.NewEncoder(fileOut)
|
||||
err = encd.Encode(expEv)
|
||||
fileOut.Close()
|
||||
return nil, err
|
||||
return err
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath)
|
||||
return
|
||||
}
|
||||
|
||||
// AddEvent adds one event
|
||||
|
||||
@@ -97,14 +97,16 @@ func (rs *Responder) GetCost(arg *CallDescriptorWithAPIOpts, reply *CallCost) (e
|
||||
if !rs.usageAllowed(arg.ToR, arg.GetDuration()) {
|
||||
return utils.ErrMaxUsageExceeded
|
||||
}
|
||||
r, e := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
return arg.GetCost()
|
||||
var r *CallCost
|
||||
guardian.Guardian.Guard(func() (_ error) {
|
||||
r, err = arg.GetCost()
|
||||
return
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.AccountPrefix+arg.GetAccountKey())
|
||||
if r != nil {
|
||||
*reply = *r.(*CallCost)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if e != nil {
|
||||
return e
|
||||
if r != nil {
|
||||
*reply = *r
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -128,12 +130,11 @@ func (rs *Responder) GetCostOnRatingPlans(arg *utils.GetCostOnRatingPlansArgs, r
|
||||
},
|
||||
}
|
||||
var cc *CallCost
|
||||
if _, errGuard := guardian.Guardian.Guard(func() (_ interface{}, errGuard error) { // prevent cache data concurrency
|
||||
|
||||
if errGuard := guardian.Guardian.Guard(func() (errGuard error) { // prevent cache data concurrency
|
||||
// force cache set so it can be picked by calldescriptor for cost calculation
|
||||
if errGuard := Cache.Set(utils.CacheRatingProfilesTmp, rPrfl.Id, rPrfl, nil,
|
||||
true, utils.NonTransactional); errGuard != nil {
|
||||
return nil, errGuard
|
||||
return errGuard
|
||||
}
|
||||
cd := &CallDescriptor{
|
||||
Category: utils.MetaTmp,
|
||||
@@ -146,11 +147,8 @@ func (rs *Responder) GetCostOnRatingPlans(arg *utils.GetCostOnRatingPlansArgs, r
|
||||
DurationIndex: arg.Usage,
|
||||
}
|
||||
cc, err = cd.GetCost()
|
||||
if errGuard := Cache.Remove(utils.CacheRatingProfilesTmp, rPrfl.Id,
|
||||
true, utils.NonTransactional); errGuard != nil { // Remove here so we don't overload memory
|
||||
return nil, errGuard
|
||||
}
|
||||
return
|
||||
return Cache.Remove(utils.CacheRatingProfilesTmp, rPrfl.Id,
|
||||
true, utils.NonTransactional) // Remove here so we don't overload memory
|
||||
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ConcatenatedKey(utils.CacheRatingProfilesTmp, rPrfl.Id)); errGuard != nil {
|
||||
return errGuard
|
||||
|
||||
@@ -95,7 +95,7 @@ func (sS *StatService) storeStats() {
|
||||
if sID == "" {
|
||||
break // no more keys, backup completed
|
||||
}
|
||||
guardian.Guardian.Guard(func() (gRes interface{}, gErr error) {
|
||||
guardian.Guardian.Guard(func() (_ error) {
|
||||
if sqIf, ok := Cache.Get(utils.CacheStatQueues, sID); !ok || sqIf == nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> failed retrieving from cache stat queue with ID: %s",
|
||||
@@ -173,7 +173,7 @@ func (sS *StatService) matchingStatQueuesForEvent(tnt string, statsIDs []string,
|
||||
continue
|
||||
}
|
||||
var sq *StatQueue
|
||||
guardian.Guardian.Guard(func() (gRes interface{}, gErr error) {
|
||||
guardian.Guardian.Guard(func() (_ error) {
|
||||
sq, err = sS.dm.GetStatQueue(sqPrfl.Tenant, sqPrfl.ID, true, true, "")
|
||||
return
|
||||
}, sS.cgrcfg.GeneralCfg().LockingTimeout, utils.StatQueuePrefix+sqPrfl.TenantID())
|
||||
@@ -246,7 +246,7 @@ func (attr *StatsArgsProcessEvent) Clone() *StatsArgsProcessEvent {
|
||||
|
||||
func (sS *StatService) getStatQueue(tnt, id string) (sq *StatQueue, err error) {
|
||||
var removed int
|
||||
guardian.Guardian.Guard(func() (_ interface{}, _ error) {
|
||||
guardian.Guardian.Guard(func() (_ error) {
|
||||
if sq, err = sS.dm.GetStatQueue(tnt, id, true, true, utils.EmptyString); err != nil {
|
||||
return
|
||||
}
|
||||
@@ -296,7 +296,7 @@ func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (st
|
||||
var withErrors bool
|
||||
for _, sq := range matchSQs {
|
||||
stsIDs = append(stsIDs, sq.ID)
|
||||
guardian.Guardian.Guard(func() (_ interface{}, _ error) {
|
||||
guardian.Guardian.Guard(func() (_ error) {
|
||||
err = sq.ProcessEvent(tnt, args.ID, sS.filterS, evNm)
|
||||
return
|
||||
}, sS.cgrcfg.GeneralCfg().LockingTimeout, utils.StatQueuePrefix+sq.TenantID())
|
||||
@@ -505,7 +505,7 @@ func (sS *StatService) StartLoop() {
|
||||
// V1ResetStatQueue resets the stat queue
|
||||
func (sS *StatService) V1ResetStatQueue(tntID *utils.TenantID, rply *string) (err error) {
|
||||
var sq *StatQueue
|
||||
guardian.Guardian.Guard(func() (_ interface{}, _ error) {
|
||||
guardian.Guardian.Guard(func() (_ error) {
|
||||
if sq, err = sS.dm.GetStatQueue(tntID.Tenant, tntID.ID,
|
||||
true, true, utils.NonTransactional); err != nil {
|
||||
return
|
||||
|
||||
@@ -1123,7 +1123,7 @@ func (ms *MongoStorage) AddLoadHistory(ldInst *utils.LoadInstance,
|
||||
if kv.Value != nil {
|
||||
existingLoadHistory = kv.Value
|
||||
}
|
||||
_, err := guardian.Guardian.Guard(func() (interface{}, error) { // Make sure we do it locked since other instance can modify history while we read it
|
||||
err := guardian.Guardian.Guard(func() error { // Make sure we do it locked since other instance can modify history while we read it
|
||||
// insert on first position
|
||||
existingLoadHistory = append(existingLoadHistory, nil)
|
||||
copy(existingLoadHistory[1:], existingLoadHistory[0:])
|
||||
@@ -1134,7 +1134,7 @@ func (ms *MongoStorage) AddLoadHistory(ldInst *utils.LoadInstance,
|
||||
if histLen >= loadHistSize { // Have hit maximum history allowed, remove oldest element in order to add new one
|
||||
existingLoadHistory = existingLoadHistory[:loadHistSize]
|
||||
}
|
||||
return nil, ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
return ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
_, err = ms.getCol(ColLht).UpdateOne(sctx, bson.M{"key": utils.LoadInstKey},
|
||||
bson.M{"$set": struct {
|
||||
Key string
|
||||
|
||||
@@ -572,17 +572,17 @@ func (rs *RedisStorage) AddLoadHistory(ldInst *utils.LoadInstance, loadHistSize
|
||||
if marshaled, err = rs.ms.Marshal(&ldInst); err != nil {
|
||||
return
|
||||
}
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) { // Make sure we do it locked since other instance can modify history while we read it
|
||||
err = guardian.Guardian.Guard(func() error { // Make sure we do it locked since other instance can modify history while we read it
|
||||
var histLen int
|
||||
if err := rs.Cmd(&histLen, redis_LLEN, utils.LoadInstKey); err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
if histLen >= loadHistSize { // Have hit maximum history allowed, remove oldest element in order to add new one
|
||||
if err = rs.Cmd(nil, redis_RPOP, utils.LoadInstKey); err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil, rs.Cmd(nil, redis_LPUSH, utils.LoadInstKey, string(marshaled))
|
||||
return rs.Cmd(nil, redis_LPUSH, utils.LoadInstKey, string(marshaled))
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.LoadInstKey)
|
||||
|
||||
if errCh := Cache.Remove(utils.LoadInstKey, "",
|
||||
|
||||
@@ -29,19 +29,24 @@ import (
|
||||
// Guardian is the global package variable
|
||||
var Guardian = &GuardianLocker{
|
||||
locks: make(map[string]*itemLock),
|
||||
refs: make(map[string][]string)}
|
||||
refs: make(map[string]*refObj)}
|
||||
|
||||
type itemLock struct {
|
||||
lk chan struct{} //better with mutex
|
||||
cnt int64
|
||||
}
|
||||
|
||||
type refObj struct {
|
||||
refs []string
|
||||
tm *time.Timer
|
||||
}
|
||||
|
||||
// GuardianLocker is an optimized locking system per locking key
|
||||
type GuardianLocker struct {
|
||||
locks map[string]*itemLock
|
||||
lkMux sync.Mutex // protects the locks
|
||||
refs map[string][]string // used in case of remote locks
|
||||
refsMux sync.RWMutex // protects the map
|
||||
lkMux sync.Mutex // protects the locks
|
||||
refs map[string]*refObj // used in case of remote locks
|
||||
refsMux sync.RWMutex // protects the map
|
||||
}
|
||||
|
||||
func (gl *GuardianLocker) lockItem(itmID string) {
|
||||
@@ -78,11 +83,11 @@ func (gl *GuardianLocker) unlockItem(itmID string) {
|
||||
delete(gl.locks, itmID)
|
||||
}
|
||||
gl.lkMux.Unlock()
|
||||
itmLock.lk <- struct{}{} //the unlock should be above the gl.Lock
|
||||
itmLock.lk <- struct{}{}
|
||||
}
|
||||
|
||||
// lockWithReference will perform locks and also generate a lock reference for it (so it can be used when remotely locking)
|
||||
func (gl *GuardianLocker) lockWithReference(refID string, lkIDs []string) string {
|
||||
func (gl *GuardianLocker) lockWithReference(refID string, timeout time.Duration, lkIDs ...string) string {
|
||||
var refEmpty bool
|
||||
if refID == "" {
|
||||
refEmpty = true
|
||||
@@ -97,7 +102,18 @@ func (gl *GuardianLocker) lockWithReference(refID string, lkIDs []string) string
|
||||
return "" // no locking was done
|
||||
}
|
||||
}
|
||||
gl.refs[refID] = lkIDs
|
||||
var tm *time.Timer
|
||||
if timeout != 0 {
|
||||
tm = time.AfterFunc(timeout, func() {
|
||||
if lkIDs := gl.unlockWithReference(refID); len(lkIDs) != 0 {
|
||||
utils.Logger.Warning(fmt.Sprintf("<Guardian> force timing-out locks: %+v", lkIDs))
|
||||
}
|
||||
})
|
||||
}
|
||||
gl.refs[refID] = &refObj{
|
||||
refs: lkIDs,
|
||||
tm: tm,
|
||||
}
|
||||
gl.refsMux.Unlock()
|
||||
// execute the real locks
|
||||
for _, lk := range lkIDs {
|
||||
@@ -111,15 +127,18 @@ func (gl *GuardianLocker) lockWithReference(refID string, lkIDs []string) string
|
||||
func (gl *GuardianLocker) unlockWithReference(refID string) (lkIDs []string) {
|
||||
gl.lockItem(refID)
|
||||
gl.refsMux.Lock()
|
||||
lkIDs, has := gl.refs[refID] // this value is local and not sent back
|
||||
|
||||
ref, has := gl.refs[refID]
|
||||
if !has {
|
||||
gl.refsMux.Unlock()
|
||||
gl.unlockItem(refID)
|
||||
return
|
||||
}
|
||||
if ref.tm != nil {
|
||||
ref.tm.Stop()
|
||||
}
|
||||
delete(gl.refs, refID)
|
||||
gl.refsMux.Unlock()
|
||||
lkIDs = ref.refs
|
||||
for _, lk := range lkIDs {
|
||||
gl.unlockItem(lk)
|
||||
}
|
||||
@@ -128,58 +147,41 @@ func (gl *GuardianLocker) unlockWithReference(refID string) (lkIDs []string) {
|
||||
}
|
||||
|
||||
// Guard executes the handler between locks
|
||||
func (gl *GuardianLocker) Guard(handler func() (interface{}, error), timeout time.Duration, lockIDs ...string) (reply interface{}, err error) { // do we need the interface here as a reply?
|
||||
func (gl *GuardianLocker) Guard(handler func() error, timeout time.Duration, lockIDs ...string) (err error) { // do we need the interface here as a reply?
|
||||
for _, lockID := range lockIDs {
|
||||
gl.lockItem(lockID)
|
||||
}
|
||||
rplyChan := make(chan interface{}) // make them buffered in order to not have a gorutine sitting on just because there is nobody to read from them
|
||||
errChan := make(chan error)
|
||||
go func(rplyChan chan interface{}, errChan chan error) {
|
||||
// execute
|
||||
if rply, err := handler(); err != nil {
|
||||
errChan <- err
|
||||
} else {
|
||||
rplyChan <- rply
|
||||
}
|
||||
}(rplyChan, errChan)
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
errChan <- handler()
|
||||
}()
|
||||
if timeout > 0 { // wait with timeout
|
||||
tm := time.NewTimer(timeout)
|
||||
select {
|
||||
case err = <-errChan:
|
||||
case reply = <-rplyChan:
|
||||
case <-time.After(timeout):
|
||||
close(errChan)
|
||||
tm.Stop()
|
||||
case <-tm.C:
|
||||
utils.Logger.Warning(fmt.Sprintf("<Guardian> force timing-out locks: %+v", lockIDs))
|
||||
}
|
||||
} else { // a bit dangerous but wait till handler finishes
|
||||
select {
|
||||
case err = <-errChan:
|
||||
case reply = <-rplyChan:
|
||||
}
|
||||
err = <-errChan
|
||||
close(errChan)
|
||||
}
|
||||
for _, lockID := range lockIDs {
|
||||
gl.unlockItem(lockID)
|
||||
}
|
||||
// consider closing the return chanels if there is no timout
|
||||
return
|
||||
}
|
||||
|
||||
// GuardIDs aquires a lock for duration
|
||||
// returns the reference ID for the lock group aquired
|
||||
func (gl *GuardianLocker) GuardIDs(refID string, timeout time.Duration, lkIDs ...string) (retRefID string) {
|
||||
retRefID = gl.lockWithReference(refID, lkIDs)
|
||||
if timeout != 0 && retRefID != "" { // we should consider using time.AfterFunc and store the timer
|
||||
go func() {
|
||||
time.Sleep(timeout)
|
||||
lkIDs := gl.unlockWithReference(retRefID)
|
||||
if len(lkIDs) != 0 {
|
||||
utils.Logger.Warning(fmt.Sprintf("<Guardian> force timing-out locks: %+v", lkIDs))
|
||||
}
|
||||
}()
|
||||
}
|
||||
return
|
||||
func (gl *GuardianLocker) GuardIDs(refID string, timeout time.Duration, lkIDs ...string) string {
|
||||
return gl.lockWithReference(refID, timeout, lkIDs...)
|
||||
}
|
||||
|
||||
// UnguardIDs attempts to unlock a set of locks based on their reference ID received on lock
|
||||
func (gl *GuardianLocker) UnguardIDs(refID string) (lkIDs []string) {
|
||||
func (gl *GuardianLocker) UnguardIDs(refID string) (_ []string) {
|
||||
if refID == "" {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1,180 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package guardian
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// Guardian is the global package variable
|
||||
var Guardian2 = &GuardianLocker2{
|
||||
locks: make(map[string]*itemLock2),
|
||||
refs: make(map[string]*refObj)}
|
||||
|
||||
type itemLock2 struct {
|
||||
sync.Mutex
|
||||
cnt int64
|
||||
}
|
||||
|
||||
type refObj struct {
|
||||
refs []string
|
||||
tm *time.Timer
|
||||
}
|
||||
|
||||
// GuardianLocker2 is an optimized locking system per locking key
|
||||
type GuardianLocker2 struct {
|
||||
locks map[string]*itemLock2
|
||||
lkMux sync.Mutex // protects the locks
|
||||
refs map[string]*refObj // used in case of remote locks
|
||||
refsMux sync.RWMutex // protects the map
|
||||
}
|
||||
|
||||
func (gl *GuardianLocker2) lockItem(itmID string) {
|
||||
if itmID == "" {
|
||||
return
|
||||
}
|
||||
gl.lkMux.Lock()
|
||||
itmLock, exists := gl.locks[itmID]
|
||||
if !exists {
|
||||
itmLock = new(itemLock2)
|
||||
gl.locks[itmID] = itmLock
|
||||
}
|
||||
itmLock.cnt++
|
||||
gl.lkMux.Unlock()
|
||||
itmLock.Lock()
|
||||
}
|
||||
|
||||
func (gl *GuardianLocker2) unlockItem(itmID string) {
|
||||
gl.lkMux.Lock()
|
||||
itmLock, exists := gl.locks[itmID]
|
||||
if !exists {
|
||||
gl.lkMux.Unlock()
|
||||
return
|
||||
}
|
||||
itmLock.cnt--
|
||||
if itmLock.cnt == 0 {
|
||||
delete(gl.locks, itmID)
|
||||
}
|
||||
itmLock.Unlock()
|
||||
gl.lkMux.Unlock()
|
||||
}
|
||||
|
||||
// lockWithReference will perform locks and also generate a lock reference for it (so it can be used when remotely locking)
|
||||
func (gl *GuardianLocker2) lockWithReference(refID string, timeout time.Duration, lkIDs ...string) string {
|
||||
var refEmpty bool
|
||||
if refID == "" {
|
||||
refEmpty = true
|
||||
refID = utils.GenUUID()
|
||||
}
|
||||
gl.lockItem(refID) // make sure we only process one simultaneous refID at the time, otherwise checking already used refID is not reliable
|
||||
gl.refsMux.Lock()
|
||||
if !refEmpty {
|
||||
if _, has := gl.refs[refID]; has {
|
||||
gl.refsMux.Unlock()
|
||||
gl.unlockItem(refID)
|
||||
return "" // no locking was done
|
||||
}
|
||||
}
|
||||
var tm *time.Timer
|
||||
if timeout != 0 {
|
||||
tm = time.AfterFunc(timeout, func() {
|
||||
if lkIDs := gl.unlockWithReference(refID); len(lkIDs) != 0 {
|
||||
utils.Logger.Warning(fmt.Sprintf("<Guardian> force timing-out locks: %+v", lkIDs))
|
||||
}
|
||||
})
|
||||
}
|
||||
gl.refs[refID] = &refObj{
|
||||
refs: lkIDs,
|
||||
tm: tm,
|
||||
}
|
||||
gl.refsMux.Unlock()
|
||||
// execute the real locks
|
||||
for _, lk := range lkIDs {
|
||||
gl.lockItem(lk)
|
||||
}
|
||||
gl.unlockItem(refID)
|
||||
return refID
|
||||
}
|
||||
|
||||
// unlockWithReference will unlock based on the reference ID
|
||||
func (gl *GuardianLocker2) unlockWithReference(refID string) (lkIDs []string) {
|
||||
gl.lockItem(refID)
|
||||
gl.refsMux.Lock()
|
||||
ref, has := gl.refs[refID]
|
||||
if !has {
|
||||
gl.refsMux.Unlock()
|
||||
gl.unlockItem(refID)
|
||||
return
|
||||
}
|
||||
if ref.tm != nil {
|
||||
ref.tm.Stop()
|
||||
}
|
||||
delete(gl.refs, refID)
|
||||
gl.refsMux.Unlock()
|
||||
lkIDs = ref.refs
|
||||
for _, lk := range lkIDs {
|
||||
gl.unlockItem(lk)
|
||||
}
|
||||
gl.unlockItem(refID)
|
||||
return
|
||||
}
|
||||
|
||||
// Guard executes the handler between locks
|
||||
func (gl *GuardianLocker2) Guard(handler func() error, timeout time.Duration, lockIDs ...string) (err error) { // do we need the interface here as a reply?
|
||||
for _, lockID := range lockIDs {
|
||||
gl.lockItem(lockID)
|
||||
}
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
errChan <- handler()
|
||||
}()
|
||||
if timeout > 0 { // wait with timeout
|
||||
select {
|
||||
case err = <-errChan:
|
||||
close(errChan)
|
||||
case <-time.After(timeout):
|
||||
utils.Logger.Warning(fmt.Sprintf("<Guardian> force timing-out locks: %+v", lockIDs))
|
||||
}
|
||||
} else { // a bit dangerous but wait till handler finishes
|
||||
err = <-errChan
|
||||
close(errChan)
|
||||
}
|
||||
for _, lockID := range lockIDs {
|
||||
gl.unlockItem(lockID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GuardIDs aquires a lock for duration
|
||||
// returns the reference ID for the lock group aquired
|
||||
func (gl *GuardianLocker2) GuardIDs(refID string, timeout time.Duration, lkIDs ...string) string {
|
||||
return gl.lockWithReference(refID, timeout, lkIDs...)
|
||||
}
|
||||
|
||||
// UnguardIDs attempts to unlock a set of locks based on their reference ID received on lock
|
||||
func (gl *GuardianLocker2) UnguardIDs(refID string) (_ []string) {
|
||||
if refID == "" {
|
||||
return
|
||||
}
|
||||
return gl.unlockWithReference(refID)
|
||||
}
|
||||
@@ -1,346 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package guardian
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func delayHandler2() error {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Forks 3 groups of workers and makes sure that the time for execution is the one we expect for all 15 goroutines (with 100ms )
|
||||
func TestGuardian2MultipleKeys(t *testing.T) {
|
||||
tStart := time.Now()
|
||||
maxIter := 5
|
||||
sg := new(sync.WaitGroup)
|
||||
keys := []string{"test1", "test2", "test3"}
|
||||
for i := 0; i < maxIter; i++ {
|
||||
for _, key := range keys {
|
||||
sg.Add(1)
|
||||
go func(key string) {
|
||||
Guardian2.Guard(delayHandler2, 0, key)
|
||||
sg.Done()
|
||||
}(key)
|
||||
}
|
||||
}
|
||||
sg.Wait()
|
||||
mustExecDur := time.Duration(maxIter*100) * time.Millisecond
|
||||
if execTime := time.Now().Sub(tStart); execTime < mustExecDur ||
|
||||
execTime > mustExecDur+100*time.Millisecond {
|
||||
t.Errorf("Execution took: %v", execTime)
|
||||
}
|
||||
Guardian2.lkMux.Lock()
|
||||
for _, key := range keys {
|
||||
if _, hasKey := Guardian2.locks[key]; hasKey {
|
||||
t.Errorf("Possible memleak for key: %s", key)
|
||||
}
|
||||
}
|
||||
Guardian2.lkMux.Unlock()
|
||||
}
|
||||
|
||||
func TestGuardian2Timeout(t *testing.T) {
|
||||
tStart := time.Now()
|
||||
maxIter := 5
|
||||
sg := new(sync.WaitGroup)
|
||||
keys := []string{"test1", "test2", "test3"}
|
||||
for i := 0; i < maxIter; i++ {
|
||||
for _, key := range keys {
|
||||
sg.Add(1)
|
||||
go func(key string) {
|
||||
Guardian2.Guard(delayHandler2, 10*time.Millisecond, key)
|
||||
sg.Done()
|
||||
}(key)
|
||||
}
|
||||
}
|
||||
sg.Wait()
|
||||
mustExecDur := time.Duration(maxIter*10) * time.Millisecond
|
||||
if execTime := time.Now().Sub(tStart); execTime < mustExecDur ||
|
||||
execTime > mustExecDur+100*time.Millisecond {
|
||||
t.Errorf("Execution took: %v", execTime)
|
||||
}
|
||||
Guardian2.lkMux.Lock()
|
||||
for _, key := range keys {
|
||||
if _, hasKey := Guardian2.locks[key]; hasKey {
|
||||
t.Error("Possible memleak")
|
||||
}
|
||||
}
|
||||
Guardian2.lkMux.Unlock()
|
||||
}
|
||||
|
||||
func TestGuardian2GuardIDs(t *testing.T) {
|
||||
|
||||
//lock with 3 keys
|
||||
lockIDs := []string{"test1", "test2", "test3"}
|
||||
// make sure the keys are not in guardian before lock
|
||||
Guardian2.lkMux.Lock()
|
||||
for _, lockID := range lockIDs {
|
||||
if _, hasKey := Guardian2.locks[lockID]; hasKey {
|
||||
t.Errorf("Unexpected lockID found: %s", lockID)
|
||||
}
|
||||
}
|
||||
Guardian2.lkMux.Unlock()
|
||||
// lock 3 items
|
||||
tStart := time.Now()
|
||||
lockDur := 2 * time.Millisecond
|
||||
Guardian2.GuardIDs("", lockDur, lockIDs...)
|
||||
Guardian2.lkMux.Lock()
|
||||
for _, lockID := range lockIDs {
|
||||
if itmLock, hasKey := Guardian2.locks[lockID]; !hasKey {
|
||||
t.Errorf("Cannot find lock for lockID: %s", lockID)
|
||||
} else if itmLock.cnt != 1 {
|
||||
t.Errorf("Unexpected itmLock found: %+v", itmLock)
|
||||
}
|
||||
}
|
||||
Guardian2.lkMux.Unlock()
|
||||
secLockDur := time.Millisecond
|
||||
// second lock to test counter
|
||||
go Guardian2.GuardIDs("", secLockDur, lockIDs[1:]...)
|
||||
time.Sleep(30 * time.Microsecond) // give time for goroutine to lock
|
||||
// check if counters were properly increased
|
||||
Guardian2.lkMux.Lock()
|
||||
lkID := lockIDs[0]
|
||||
eCnt := int64(1)
|
||||
if itmLock, hasKey := Guardian2.locks[lkID]; !hasKey {
|
||||
t.Errorf("Cannot find lock for lockID: %s", lkID)
|
||||
} else if itmLock.cnt != eCnt {
|
||||
t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID)
|
||||
}
|
||||
lkID = lockIDs[1]
|
||||
eCnt = int64(2)
|
||||
if itmLock, hasKey := Guardian2.locks[lkID]; !hasKey {
|
||||
t.Errorf("Cannot find lock for lockID: %s", lkID)
|
||||
} else if itmLock.cnt != eCnt {
|
||||
t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID)
|
||||
}
|
||||
lkID = lockIDs[2]
|
||||
eCnt = int64(1) // we did not manage to increase it yet since it did not pass first lock
|
||||
if itmLock, hasKey := Guardian2.locks[lkID]; !hasKey {
|
||||
t.Errorf("Cannot find lock for lockID: %s", lkID)
|
||||
} else if itmLock.cnt != eCnt {
|
||||
t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID)
|
||||
}
|
||||
Guardian2.lkMux.Unlock()
|
||||
time.Sleep(lockDur + secLockDur + 50*time.Millisecond) // give time to unlock before proceeding
|
||||
|
||||
// make sure all counters were removed
|
||||
for _, lockID := range lockIDs {
|
||||
if _, hasKey := Guardian2.locks[lockID]; hasKey {
|
||||
t.Errorf("Unexpected lockID found: %s", lockID)
|
||||
}
|
||||
}
|
||||
// test lock without timer
|
||||
refID := Guardian2.GuardIDs("", 0, lockIDs...)
|
||||
|
||||
if totalLockDur := time.Now().Sub(tStart); totalLockDur < lockDur {
|
||||
t.Errorf("Lock duration too small")
|
||||
}
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
// making sure the items stay locked
|
||||
Guardian2.lkMux.Lock()
|
||||
if len(Guardian2.locks) != 3 {
|
||||
t.Errorf("locks should have 3 elements, have: %+v", Guardian2.locks)
|
||||
}
|
||||
for _, lkID := range lockIDs {
|
||||
if itmLock, hasKey := Guardian2.locks[lkID]; !hasKey {
|
||||
t.Errorf("Cannot find lock for lockID: %s", lkID)
|
||||
} else if itmLock.cnt != 1 {
|
||||
t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID)
|
||||
}
|
||||
}
|
||||
Guardian2.lkMux.Unlock()
|
||||
Guardian2.UnguardIDs(refID)
|
||||
// make sure items were unlocked
|
||||
Guardian2.lkMux.Lock()
|
||||
if len(Guardian2.locks) != 0 {
|
||||
t.Errorf("locks should have 0 elements, has: %+v", Guardian2.locks)
|
||||
}
|
||||
Guardian2.lkMux.Unlock()
|
||||
}
|
||||
|
||||
// TestGuardian2GuardIDsConcurrent executes GuardIDs concurrently
|
||||
func TestGuardian2GuardIDsConcurrent(t *testing.T) {
|
||||
maxIter := 500
|
||||
sg := new(sync.WaitGroup)
|
||||
keys := []string{"test1", "test2", "test3"}
|
||||
refID := utils.GenUUID()
|
||||
for i := 0; i < maxIter; i++ {
|
||||
sg.Add(1)
|
||||
go func() {
|
||||
if retRefID := Guardian2.GuardIDs(refID, 0, keys...); retRefID != "" {
|
||||
if lkIDs := Guardian2.UnguardIDs(refID); !reflect.DeepEqual(keys, lkIDs) {
|
||||
t.Errorf("expecting: %+v, received: %+v", keys, lkIDs)
|
||||
}
|
||||
}
|
||||
sg.Done()
|
||||
}()
|
||||
}
|
||||
sg.Wait()
|
||||
|
||||
Guardian2.lkMux.Lock()
|
||||
if len(Guardian2.locks) != 0 {
|
||||
t.Errorf("Possible memleak for locks: %+v", Guardian2.locks)
|
||||
}
|
||||
Guardian2.lkMux.Unlock()
|
||||
Guardian2.refsMux.Lock()
|
||||
if len(Guardian2.refs) != 0 {
|
||||
t.Errorf("Possible memleak for refs: %+v", Guardian2.refs)
|
||||
}
|
||||
Guardian2.refsMux.Unlock()
|
||||
}
|
||||
|
||||
func TestGuardian2GuardIDsTimeoutConcurrent(t *testing.T) {
|
||||
maxIter := 50
|
||||
sg := new(sync.WaitGroup)
|
||||
keys := []string{"test1", "test2", "test3"}
|
||||
refID := utils.GenUUID()
|
||||
for i := 0; i < maxIter; i++ {
|
||||
sg.Add(1)
|
||||
go func() {
|
||||
Guardian2.GuardIDs(refID, time.Microsecond, keys...)
|
||||
sg.Done()
|
||||
}()
|
||||
}
|
||||
sg.Wait()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
Guardian2.lkMux.Lock()
|
||||
if len(Guardian2.locks) != 0 {
|
||||
t.Errorf("Possible memleak for locks: %+v", Guardian2.locks)
|
||||
}
|
||||
Guardian2.lkMux.Unlock()
|
||||
Guardian2.refsMux.Lock()
|
||||
if len(Guardian2.refs) != 0 {
|
||||
t.Errorf("Possible memleak for refs: %+v", Guardian2.refs)
|
||||
}
|
||||
Guardian2.refsMux.Unlock()
|
||||
}
|
||||
|
||||
// BenchmarkGuard-8 200000 13759 ns/op
|
||||
func BenchmarkGuard2(b *testing.B) {
|
||||
wg := new(sync.WaitGroup)
|
||||
wg.Add(b.N * 3)
|
||||
b.ResetTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
go func() {
|
||||
Guardian2.Guard(func() error {
|
||||
time.Sleep(time.Microsecond)
|
||||
return nil
|
||||
}, 0, "1")
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
Guardian2.Guard(func() error {
|
||||
time.Sleep(time.Microsecond)
|
||||
return nil
|
||||
}, 0, "2")
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
Guardian2.Guard(func() error {
|
||||
time.Sleep(time.Microsecond)
|
||||
return nil
|
||||
}, 0, "1")
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// BenchmarkGuardian-8 1000000 5794 ns/op
|
||||
func BenchmarkGuardian2(b *testing.B) {
|
||||
wg := new(sync.WaitGroup)
|
||||
wg.Add(b.N)
|
||||
b.ResetTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
go func(n int) {
|
||||
Guardian2.Guard(func() error {
|
||||
time.Sleep(time.Microsecond)
|
||||
return nil
|
||||
}, 0, strconv.Itoa(n))
|
||||
wg.Done()
|
||||
}(n)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// BenchmarkGuardIDs-8 1000000 8732 ns/op
|
||||
func BenchmarkGuardIDs2(b *testing.B) {
|
||||
wg := new(sync.WaitGroup)
|
||||
wg.Add(b.N)
|
||||
b.ResetTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
go func(i int) {
|
||||
if refID := Guardian2.GuardIDs("", 0, strconv.Itoa(i)); refID != "" {
|
||||
time.Sleep(time.Microsecond)
|
||||
Guardian2.UnguardIDs(refID)
|
||||
}
|
||||
wg.Done()
|
||||
}(n)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestGuardian2LockItemUnlockItem(t *testing.T) {
|
||||
//for coverage purposes
|
||||
itemID := utils.EmptyString
|
||||
Guardian2.lockItem(itemID)
|
||||
Guardian2.unlockItem(itemID)
|
||||
if itemID != utils.EmptyString {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.EmptyString, itemID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGuardian2LockUnlockWithReference(t *testing.T) {
|
||||
//for coverage purposes
|
||||
refID := utils.EmptyString
|
||||
Guardian2.lockWithReference(refID, 0)
|
||||
Guardian2.unlockWithReference(refID)
|
||||
if refID != utils.EmptyString {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.EmptyString, refID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGuardian2GuardUnguardIDs(t *testing.T) {
|
||||
//for coverage purposes
|
||||
refID := utils.EmptyString
|
||||
lkIDs := []string{"test1", "test2", "test3"}
|
||||
Guardian2.GuardIDs(refID, time.Second, lkIDs...)
|
||||
Guardian2.UnguardIDs(refID)
|
||||
if refID != utils.EmptyString {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.EmptyString, refID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGuardian2GuardUnguardIDsCase2(t *testing.T) {
|
||||
//for coverage purposes
|
||||
lkIDs := []string{"test1", "test2", "test3"}
|
||||
err := Guardian2.Guard(func() error {
|
||||
return utils.ErrNotFound
|
||||
}, 10*time.Millisecond, lkIDs...)
|
||||
if err == nil || err != utils.ErrNotFound {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ErrNotFound, err)
|
||||
}
|
||||
}
|
||||
@@ -27,9 +27,9 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func delayHandler() (interface{}, error) {
|
||||
func delayHandler() error {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
return nil, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// Forks 3 groups of workers and makes sure that the time for execution is the one we expect for all 15 goroutines (with 100ms )
|
||||
@@ -245,23 +245,23 @@ func BenchmarkGuard(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
go func() {
|
||||
Guardian.Guard(func() (interface{}, error) {
|
||||
Guardian.Guard(func() error {
|
||||
time.Sleep(time.Microsecond)
|
||||
return 0, nil
|
||||
return nil
|
||||
}, 0, "1")
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
Guardian.Guard(func() (interface{}, error) {
|
||||
Guardian.Guard(func() error {
|
||||
time.Sleep(time.Microsecond)
|
||||
return 0, nil
|
||||
return nil
|
||||
}, 0, "2")
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
Guardian.Guard(func() (interface{}, error) {
|
||||
Guardian.Guard(func() error {
|
||||
time.Sleep(time.Microsecond)
|
||||
return 0, nil
|
||||
return nil
|
||||
}, 0, "1")
|
||||
wg.Done()
|
||||
}()
|
||||
@@ -277,9 +277,9 @@ func BenchmarkGuardian(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
go func(n int) {
|
||||
Guardian.Guard(func() (interface{}, error) {
|
||||
Guardian.Guard(func() error {
|
||||
time.Sleep(time.Microsecond)
|
||||
return 0, nil
|
||||
return nil
|
||||
}, 0, strconv.Itoa(n))
|
||||
wg.Done()
|
||||
}(n)
|
||||
@@ -317,7 +317,7 @@ func TestGuardianLockItemUnlockItem(t *testing.T) {
|
||||
func TestGuardianLockUnlockWithReference(t *testing.T) {
|
||||
//for coverage purposes
|
||||
refID := utils.EmptyString
|
||||
Guardian.lockWithReference(refID, []string{})
|
||||
Guardian.lockWithReference(refID, 0, []string{}...)
|
||||
Guardian.unlockWithReference(refID)
|
||||
if refID != utils.EmptyString {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.EmptyString, refID)
|
||||
@@ -338,8 +338,8 @@ func TestGuardianGuardUnguardIDs(t *testing.T) {
|
||||
func TestGuardianGuardUnguardIDsCase2(t *testing.T) {
|
||||
//for coverage purposes
|
||||
lkIDs := []string{"test1", "test2", "test3"}
|
||||
_, err := Guardian.Guard(func() (interface{}, error) {
|
||||
return nil, utils.ErrNotFound
|
||||
err := Guardian.Guard(func() error {
|
||||
return utils.ErrNotFound
|
||||
}, 10*time.Millisecond, lkIDs...)
|
||||
if err == nil || err != utils.ErrNotFound {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ErrNotFound, err)
|
||||
|
||||
Reference in New Issue
Block a user