diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go index b8702597c..bd5b50013 100644 --- a/apier/v1/accounts.go +++ b/apier/v1/accounts.go @@ -92,7 +92,7 @@ func (self *ApierV1) RemActionTiming(attrs AttrRemActionTiming, reply *string) e return 0, err } return 0, nil - }, utils.ACTION_TIMING_PREFIX) + }, 0, utils.ACTION_TIMING_PREFIX) if err != nil { return utils.NewErrServerError(err) } @@ -147,7 +147,7 @@ func (self *ApierV1) RemAccountActionTriggers(attrs AttrRemAcntActionTriggers, r return 0, err } return 0, nil - }, balanceId) + }, 0, balanceId) if err != nil { return utils.NewErrServerError(err) } @@ -193,7 +193,7 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) error return 0, err } return 0, nil - }, balanceId) + }, 0, balanceId) if err != nil { return utils.NewErrServerError(err) } @@ -203,7 +203,7 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) error return 0, err } return 0, nil - }, utils.ACTION_TIMING_PREFIX) + }, 0, utils.ACTION_TIMING_PREFIX) if err != nil { return utils.NewErrServerError(err) } @@ -226,7 +226,7 @@ func (self *ApierV1) RemoveAccount(attr utils.AttrRemoveAccount, reply *string) return 0, err } return 0, nil - }, accountId) + }, 0, accountId) if err != nil { return utils.NewErrServerError(err) } diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 4230976d4..12aa191ee 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -741,7 +741,7 @@ func (self *ApierV1) AddTriggeredAction(attr AttrAddActionTrigger, reply *string return 0, err } return 0, nil - }, tag) + }, 0, tag) if err != nil { *reply = err.Error() return err @@ -811,7 +811,7 @@ func (self *ApierV1) ResetTriggeredActions(attr AttrResetTriggeredAction, reply return 0, err } return 0, nil - }, accID) + }, 0, accID) if err != nil { *reply = err.Error() return err @@ -832,7 +832,7 @@ func (self *ApierV1) LoadAccountActions(attrs utils.TPAccountActions, reply *str return 0, err } return 0, nil - }, attrs.KeyId()); err != nil { + }, 0, attrs.KeyId()); err != nil { return utils.NewErrServerError(err) } // ToDo: Get the action keys loaded by dbReader so we reload only these in cache @@ -1184,7 +1184,7 @@ func (self *ApierV1) RemoveRatingProfile(attr AttrRemoveRatingProfile, reply *st return 0, err } return 0, nil - }, "RemoveRatingProfile") + }, 0, "RemoveRatingProfile") if err != nil { *reply = err.Error() return utils.NewErrServerError(err) diff --git a/apier/v2/apier.go b/apier/v2/apier.go index 4d2252e5b..729b1c673 100644 --- a/apier/v2/apier.go +++ b/apier/v2/apier.go @@ -82,7 +82,7 @@ func (self *ApierV2) LoadAccountActions(attrs AttrLoadAccountActions, reply *str return 0, err } return 0, nil - }, attrs.AccountActionsId); err != nil { + }, 0, attrs.AccountActionsId); err != nil { return utils.NewErrServerError(err) } // ToDo: Get the action keys loaded by dbReader so we reload only these in cache diff --git a/cdrc/csv.go b/cdrc/csv.go index d6aabfa7e..fa8cac9d5 100644 --- a/cdrc/csv.go +++ b/cdrc/csv.go @@ -126,7 +126,7 @@ func (self *PartialRecordsCache) dumpUnpairedRecords(fileName string) error { } delete(self.partialRecords, fileName) return nil, nil - }, fileName) + }, 0, fileName) return err } @@ -147,7 +147,7 @@ func (self *PartialRecordsCache) GetPartialRecord(accId, prefFileName string) (s cachedFilename = fName } return nil, nil - }, fName) + }, 0, fName) if cachedPartial != nil { break } @@ -169,14 +169,14 @@ func (self *PartialRecordsCache) CachePartial(fileName string, pr *PartialFlatst self.partialRecords[fileName][pr.AccId] = pr } return nil, nil - }, fileName) + }, 0, fileName) } func (self *PartialRecordsCache) UncachePartial(fileName string, pr *PartialFlatstoreRecord) { self.guard.Guard(func() (interface{}, error) { delete(self.partialRecords[fileName], pr.AccId) // Remove the record out of cache return nil, nil - }, fileName) + }, 0, fileName) } func NewCsvRecordsProcessor(csvReader *csv.Reader, cdrFormat, timezone, fileName, failedCallsPrefix string, diff --git a/engine/action_plan.go b/engine/action_plan.go index 1cf386e99..74b8278d0 100644 --- a/engine/action_plan.go +++ b/engine/action_plan.go @@ -256,7 +256,7 @@ func (at *ActionPlan) Execute() (err error) { Logger.Warning(fmt.Sprintf("Could not remove account Id: %s: %d", accId, err)) } return 0, nil - }, accId) + }, 0, accId) if err != nil { Logger.Warning(fmt.Sprintf("Error executing action timing: %v", err)) } @@ -275,7 +275,7 @@ func (at *ActionPlan) Execute() (err error) { _, err := Guardian.Guard(func() (interface{}, error) { ub, err := accountingStorage.GetAccount(accId) if err != nil { - Logger.Warning(fmt.Sprintf("Could not get user balances for this id: %s. Skipping!", accId)) + Logger.Warning(fmt.Sprintf("Could not get user balances for this id: %s. Skipping!", 0, accId)) return 0, err } else if ub.Disabled && a.ActionType != ENABLE_ACCOUNT { return 0, fmt.Errorf("Account %s is disabled", accId) @@ -285,7 +285,7 @@ func (at *ActionPlan) Execute() (err error) { //Logger.Info(fmt.Sprintf("After execute, account: %+v", ub)) accountingStorage.SetAccount(ub) return 0, nil - }, accId) + }, 0, accId) if err != nil { Logger.Warning(fmt.Sprintf("Error executing action timing: %v", err)) } diff --git a/engine/calldesc.go b/engine/calldesc.go index 715b1dfda..e7e93ed76 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -628,7 +628,7 @@ func (cd *CallDescriptor) GetMaxSessionDuration() (duration time.Duration, err e if _, err := Guardian.Guard(func() (interface{}, error) { duration, err = cd.getMaxSessionDuration(account) return 0, err - }, memberIds...); err != nil { + }, 0, memberIds...); err != nil { return 0, err } } else { @@ -683,7 +683,7 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) { Guardian.Guard(func() (interface{}, error) { cc, err = cd.debit(account, false, true) return 0, err - }, memberIds...) + }, 0, memberIds...) } else { return nil, err } @@ -719,7 +719,7 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) { cc, err = cd.debit(account, false, true) //log.Print(balanceMap[0].Value, balanceMap[1].Value) return 0, err - }, memberIds...) + }, 0, memberIds...) } else { return nil, err } diff --git a/engine/cdrs.go b/engine/cdrs.go index 7f72fd274..f3f6f3fe2 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -115,7 +115,7 @@ func (self *CdrServer) LogCallCost(ccl *CallCostLog) error { return nil, utils.ErrExists } return nil, self.cdrDb.LogCallCost(ccl.CgrId, ccl.Source, ccl.RunId, ccl.CallCost) - }, ccl.CgrId) + }, 0, ccl.CgrId) return err } return self.cdrDb.LogCallCost(ccl.CgrId, ccl.Source, ccl.RunId, ccl.CallCost) diff --git a/engine/guardian.go b/engine/guardian.go index b6e2f48ad..0e11cff67 100644 --- a/engine/guardian.go +++ b/engine/guardian.go @@ -20,6 +20,7 @@ package engine import ( "sync" + "time" ) // global package variable @@ -34,7 +35,7 @@ type GuardianLock struct { mu sync.Mutex } -func (cm *GuardianLock) Guard(handler func() (interface{}, error), names ...string) (reply interface{}, err error) { +func (cm *GuardianLock) Guard(handler func() (interface{}, error), timeout time.Duration, names ...string) (reply interface{}, err error) { cm.mu.Lock() for _, name := range names { lock, exists := Guardian.queue[name] @@ -45,7 +46,22 @@ func (cm *GuardianLock) Guard(handler func() (interface{}, error), names ...stri lock <- true } cm.mu.Unlock() - reply, err = handler() + funcWaiter := make(chan bool) + go func() { + // execute + reply, err = handler() + funcWaiter <- true + }() + // wait with timeout + if timeout > 0 { + select { + case <-funcWaiter: + case <-time.After(timeout): + } + } else { + <-funcWaiter + } + // release for _, name := range names { lock := Guardian.queue[name] <-lock diff --git a/engine/guardian_test.go b/engine/guardian_test.go index 4a7266478..402f12d66 100644 --- a/engine/guardian_test.go +++ b/engine/guardian_test.go @@ -30,18 +30,18 @@ func ATestAccountLock(t *testing.T) { time.Sleep(1 * time.Second) log.Print("end first 1") return 0, nil - }, "1") + }, 0, "1") go Guardian.Guard(func() (interface{}, error) { log.Print("first 2") time.Sleep(1 * time.Second) log.Print("end first 2") return 0, nil - }, "2") + }, 0, "2") go Guardian.Guard(func() (interface{}, error) { log.Print("second 1") time.Sleep(1 * time.Second) log.Print("end second 1") return 0, nil - }, "1") + }, 0, "1") time.Sleep(3 * time.Second) } diff --git a/engine/responder.go b/engine/responder.go index 73cfe6697..60f830f58 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -74,7 +74,7 @@ func (rs *Responder) GetCost(arg *CallDescriptor, reply *CallCost) (err error) { } else { r, e := Guardian.Guard(func() (interface{}, error) { return arg.GetCost() - }, arg.GetAccountKey()) + }, 0, arg.GetAccountKey()) if e != nil { return e } else if r != nil { @@ -147,7 +147,7 @@ func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err } else { r, e := Guardian.Guard(func() (interface{}, error) { return arg.RefundIncrements() - }, arg.GetAccountKey()) + }, 0, arg.GetAccountKey()) *reply, err = r.(float64), e } return @@ -363,7 +363,7 @@ func (rs *Responder) FlushCache(arg *CallDescriptor, reply *float64) (err error) } else { r, e := Guardian.Guard(func() (interface{}, error) { return 0, arg.FlushCache() - }, arg.GetAccountKey()) + }, 0, arg.GetAccountKey()) *reply, err = r.(float64), e } return @@ -409,7 +409,7 @@ func (rs *Responder) getCallCost(key *CallDescriptor, method string) (reply *Cal _, err = Guardian.Guard(func() (interface{}, error) { err = client.Call(method, *key, reply) return reply, err - }, key.GetAccountKey()) + }, 0, key.GetAccountKey()) if err != nil { Logger.Err(fmt.Sprintf(" Got en error from rater: %v", err)) } @@ -432,7 +432,7 @@ func (rs *Responder) callMethod(key *CallDescriptor, method string) (reply float _, err = Guardian.Guard(func() (interface{}, error) { err = client.Call(method, *key, &reply) return reply, err - }, key.GetAccountKey()) + }, 0, key.GetAccountKey()) if err != nil { Logger.Info(fmt.Sprintf("Got en error from rater: %v", err)) } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 85bdbbfe9..401d8641c 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -792,7 +792,7 @@ func (rs *RedisStorage) AddLoadHistory(ldInst *LoadInstance, loadHistSize int) e } err = rs.db.Lpush(utils.LOADINST_KEY, marshaled) return nil, err - }, utils.LOADINST_KEY) + }, 0, utils.LOADINST_KEY) return err } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 668e83007..93abc0073 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -111,7 +111,7 @@ func (s *Scheduler) LoadActionPlans(storage engine.RatingStorage) { engine.Guardian.Guard(func() (interface{}, error) { storage.SetActionPlans(key, newAts) return 0, nil - }, utils.ACTION_TIMING_PREFIX) + }, 0, utils.ACTION_TIMING_PREFIX) } } sort.Sort(s.queue)