mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
further account guard simplification
This commit is contained in:
@@ -80,7 +80,7 @@ func (self *ApierV1) RemActionTiming(attrs AttrRemActionTiming, reply *string) e
|
||||
return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing)
|
||||
}
|
||||
}
|
||||
_, err := engine.AccLock.Guard(func() (float64, error) {
|
||||
_, err := engine.AccLock.Guard(func() (interface{}, error) {
|
||||
ats, err := self.AccountDb.GetActionTimings(attrs.ActionPlanId)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -130,7 +130,7 @@ func (self *ApierV1) RemAccountActionTriggers(attrs AttrRemAcntActionTriggers, r
|
||||
return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing)
|
||||
}
|
||||
balanceId := utils.AccountKey(attrs.Tenant, attrs.Account, attrs.Direction)
|
||||
_, err := engine.AccLock.Guard(func() (float64, error) {
|
||||
_, err := engine.AccLock.Guard(func() (interface{}, error) {
|
||||
ub, err := self.AccountDb.GetAccount(balanceId)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -174,7 +174,7 @@ func (self *ApierV1) SetAccount(attr AttrSetAccount, reply *string) error {
|
||||
balanceId := utils.AccountKey(attr.Tenant, attr.Account, attr.Direction)
|
||||
var ub *engine.Account
|
||||
var ats engine.ActionPlan
|
||||
_, err := engine.AccLock.Guard(func() (float64, error) {
|
||||
_, err := engine.AccLock.Guard(func() (interface{}, error) {
|
||||
if bal, _ := self.AccountDb.GetAccount(balanceId); bal != nil {
|
||||
ub = bal
|
||||
} else { // Not found in db, create it here
|
||||
@@ -204,7 +204,7 @@ func (self *ApierV1) SetAccount(attr AttrSetAccount, reply *string) error {
|
||||
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
|
||||
}
|
||||
if len(ats) != 0 {
|
||||
_, err := engine.AccLock.Guard(func() (float64, error) { // ToDo: Try locking it above on read somehow
|
||||
_, err := engine.AccLock.Guard(func() (interface{}, error) { // ToDo: Try locking it above on read somehow
|
||||
if err := self.AccountDb.SetActionTimings(attr.ActionPlanId, ats); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
@@ -588,7 +588,7 @@ func (self *ApierV1) AddTriggeredAction(attr AttrAddActionTrigger, reply *string
|
||||
}
|
||||
|
||||
tag := utils.AccountKey(attr.Tenant, attr.Account, attr.BalanceDirection)
|
||||
_, err = engine.AccLock.Guard(func() (float64, error) {
|
||||
_, err = engine.AccLock.Guard(func() (interface{}, error) {
|
||||
userBalance, err := self.AccountDb.GetAccount(tag)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -658,7 +658,7 @@ func (self *ApierV1) ResetTriggeredActions(attr AttrResetTriggeredAction, reply
|
||||
}
|
||||
}
|
||||
accID := utils.AccountKey(attr.Tenant, attr.Account, attr.Direction)
|
||||
_, err := engine.AccLock.Guard(func() (float64, error) {
|
||||
_, err := engine.AccLock.Guard(func() (interface{}, error) {
|
||||
acc, err := self.AccountDb.GetAccount(accID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -685,7 +685,7 @@ func (self *ApierV1) LoadAccountActions(attrs utils.TPAccountActions, reply *str
|
||||
return fmt.Errorf("%s:%s", utils.ERR_MANDATORY_IE_MISSING, "TPid")
|
||||
}
|
||||
dbReader := engine.NewDbReader(self.StorDb, self.RatingDb, self.AccountDb, attrs.TPid)
|
||||
if _, err := engine.AccLock.Guard(func() (float64, error) {
|
||||
if _, err := engine.AccLock.Guard(func() (interface{}, error) {
|
||||
if err := dbReader.LoadAccountActionsFiltered(&attrs); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
@@ -73,7 +73,7 @@ func (self *ApierV2) LoadAccountActions(attrs AttrLoadAccountActions, reply *str
|
||||
tpAa := &utils.TPAccountActions{TPid: attrs.TPid}
|
||||
tpAa.SetAccountActionsId(attrs.AccountActionsId)
|
||||
|
||||
if _, err := engine.AccLock.Guard(func() (float64, error) {
|
||||
if _, err := engine.AccLock.Guard(func() (interface{}, error) {
|
||||
if err := dbReader.LoadAccountActionsFiltered(tpAa); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
@@ -37,21 +37,7 @@ func NewAccountLock() *AccountLock {
|
||||
return &AccountLock{queue: make(map[string]chan bool)}
|
||||
}
|
||||
|
||||
func (cm *AccountLock) GuardCallCost(handler func() (*CallCost, error), name string) (reply *CallCost, err error) {
|
||||
cm.mu.Lock()
|
||||
lock, exists := AccLock.queue[name]
|
||||
if !exists {
|
||||
lock = make(chan bool, 1)
|
||||
AccLock.queue[name] = lock
|
||||
}
|
||||
lock <- true
|
||||
cm.mu.Unlock()
|
||||
reply, err = handler()
|
||||
<-lock
|
||||
return
|
||||
}
|
||||
|
||||
func (cm *AccountLock) Guard(handler func() (float64, error), names ...string) (reply float64, err error) {
|
||||
func (cm *AccountLock) Guard(handler func() (interface{}, error), names ...string) (reply interface{}, err error) {
|
||||
cm.mu.Lock()
|
||||
for _, name := range names {
|
||||
lock, exists := AccLock.queue[name]
|
||||
|
||||
@@ -25,19 +25,19 @@ import (
|
||||
)
|
||||
|
||||
func ATestAccountLock(t *testing.T) {
|
||||
go AccLock.Guard(func() (float64, error) {
|
||||
go AccLock.Guard(func() (interface{}, error) {
|
||||
log.Print("first 1")
|
||||
time.Sleep(1 * time.Second)
|
||||
log.Print("end first 1")
|
||||
return 0, nil
|
||||
}, "1")
|
||||
go AccLock.Guard(func() (float64, error) {
|
||||
go AccLock.Guard(func() (interface{}, error) {
|
||||
log.Print("first 2")
|
||||
time.Sleep(1 * time.Second)
|
||||
log.Print("end first 2")
|
||||
return 0, nil
|
||||
}, "2")
|
||||
go AccLock.Guard(func() (float64, error) {
|
||||
go AccLock.Guard(func() (interface{}, error) {
|
||||
log.Print("second 1")
|
||||
time.Sleep(1 * time.Second)
|
||||
log.Print("end second 1")
|
||||
|
||||
@@ -257,7 +257,7 @@ func (at *ActionTiming) Execute() (err error) {
|
||||
return
|
||||
}
|
||||
for _, ubId := range at.AccountIds {
|
||||
_, err := AccLock.Guard(func() (float64, error) {
|
||||
_, err := AccLock.Guard(func() (interface{}, error) {
|
||||
ub, err := accountingStorage.GetAccount(ubId)
|
||||
if err != nil {
|
||||
Logger.Warning(fmt.Sprintf("Could not get user balances for this id: %s. Skipping!", ubId))
|
||||
|
||||
@@ -502,7 +502,7 @@ func (cd *CallDescriptor) GetMaxSessionDuration() (duration time.Duration, err e
|
||||
return 0, err
|
||||
} else {
|
||||
if memberIds, err := account.GetUniqueSharedGroupMembers(cd.Destination, cd.Direction, cd.Category, cd.TOR); err == nil {
|
||||
AccLock.Guard(func() (float64, error) {
|
||||
AccLock.Guard(func() (interface{}, error) {
|
||||
duration, err = cd.getMaxSessionDuration(account)
|
||||
return 0, err
|
||||
}, memberIds...)
|
||||
@@ -551,7 +551,7 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
|
||||
return nil, err
|
||||
} else {
|
||||
if memberIds, err := account.GetUniqueSharedGroupMembers(cd.Destination, cd.Direction, cd.Category, cd.TOR); err == nil {
|
||||
AccLock.Guard(func() (float64, error) {
|
||||
AccLock.Guard(func() (interface{}, error) {
|
||||
cc, err = cd.debit(account, false, true)
|
||||
return 0, err
|
||||
}, memberIds...)
|
||||
@@ -573,7 +573,7 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) {
|
||||
} else {
|
||||
//log.Printf("ACC: %+v", account)
|
||||
if memberIds, err := account.GetUniqueSharedGroupMembers(cd.Destination, cd.Direction, cd.Category, cd.TOR); err == nil {
|
||||
AccLock.Guard(func() (float64, error) {
|
||||
AccLock.Guard(func() (interface{}, error) {
|
||||
remainingDuration, err := cd.getMaxSessionDuration(account)
|
||||
//log.Print("AFTER MAX SESSION: ", cd)
|
||||
if err != nil || remainingDuration == 0 {
|
||||
|
||||
@@ -55,13 +55,13 @@ func (rs *Responder) GetCost(arg CallDescriptor, reply *CallCost) (err error) {
|
||||
r, e := rs.getCallCost(&arg, "Responder.GetCost")
|
||||
*reply, err = *r, e
|
||||
} else {
|
||||
r, e := AccLock.GuardCallCost(func() (*CallCost, error) {
|
||||
r, e := AccLock.Guard(func() (interface{}, error) {
|
||||
return arg.GetCost()
|
||||
}, arg.GetAccountKey())
|
||||
if e != nil {
|
||||
return e
|
||||
} else if r != nil {
|
||||
*reply = *r
|
||||
*reply = *r.(*CallCost)
|
||||
}
|
||||
}
|
||||
return
|
||||
@@ -72,13 +72,13 @@ func (rs *Responder) Debit(arg CallDescriptor, reply *CallCost) (err error) {
|
||||
r, e := rs.getCallCost(&arg, "Responder.Debit")
|
||||
*reply, err = *r, e
|
||||
} else {
|
||||
r, e := AccLock.GuardCallCost(func() (*CallCost, error) {
|
||||
r, e := AccLock.Guard(func() (interface{}, error) {
|
||||
return arg.Debit()
|
||||
}, arg.GetAccountKey())
|
||||
if e != nil {
|
||||
return e
|
||||
} else if r != nil {
|
||||
*reply = *r
|
||||
*reply = *r.(*CallCost)
|
||||
}
|
||||
}
|
||||
return
|
||||
@@ -89,13 +89,13 @@ func (rs *Responder) MaxDebit(arg CallDescriptor, reply *CallCost) (err error) {
|
||||
r, e := rs.getCallCost(&arg, "Responder.MaxDebit")
|
||||
*reply, err = *r, e
|
||||
} else {
|
||||
r, e := AccLock.GuardCallCost(func() (*CallCost, error) {
|
||||
r, e := AccLock.Guard(func() (interface{}, error) {
|
||||
return arg.MaxDebit()
|
||||
}, arg.GetAccountKey())
|
||||
if e != nil {
|
||||
return e
|
||||
} else if r != nil {
|
||||
*reply = *r
|
||||
*reply = *r.(*CallCost)
|
||||
}
|
||||
}
|
||||
return
|
||||
@@ -105,10 +105,10 @@ func (rs *Responder) RefundIncrements(arg CallDescriptor, reply *float64) (err e
|
||||
if rs.Bal != nil {
|
||||
*reply, err = rs.callMethod(&arg, "Responder.RefundIncrements")
|
||||
} else {
|
||||
r, e := AccLock.Guard(func() (float64, error) {
|
||||
r, e := AccLock.Guard(func() (interface{}, error) {
|
||||
return arg.RefundIncrements()
|
||||
}, arg.GetAccountKey())
|
||||
*reply, err = r, e
|
||||
*reply, err = r.(float64), e
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -117,11 +117,11 @@ func (rs *Responder) GetMaxSessionTime(arg CallDescriptor, reply *float64) (err
|
||||
if rs.Bal != nil {
|
||||
*reply, err = rs.callMethod(&arg, "Responder.GetMaxSessionTime")
|
||||
} else {
|
||||
r, e := AccLock.Guard(func() (float64, error) {
|
||||
r, e := AccLock.Guard(func() (interface{}, error) {
|
||||
d, err := arg.GetMaxSessionDuration()
|
||||
return float64(d), err
|
||||
}, arg.GetAccountKey())
|
||||
*reply, err = r, e
|
||||
*reply, err = r.(float64), e
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -252,10 +252,10 @@ func (rs *Responder) FlushCache(arg CallDescriptor, reply *float64) (err error)
|
||||
if rs.Bal != nil {
|
||||
*reply, err = rs.callMethod(&arg, "Responder.FlushCache")
|
||||
} else {
|
||||
r, e := AccLock.Guard(func() (float64, error) {
|
||||
r, e := AccLock.Guard(func() (interface{}, error) {
|
||||
return 0, arg.FlushCache()
|
||||
}, arg.GetAccountKey())
|
||||
*reply, err = r, e
|
||||
*reply, err = r.(float64), e
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -294,8 +294,7 @@ func (rs *Responder) getCallCost(key *CallDescriptor, method string) (reply *Cal
|
||||
Logger.Info("<Balancer> Waiting for raters to register...")
|
||||
time.Sleep(1 * time.Second) // wait one second and retry
|
||||
} else {
|
||||
reply = &CallCost{}
|
||||
reply, err = AccLock.GuardCallCost(func() (*CallCost, error) {
|
||||
_, err = AccLock.Guard(func() (interface{}, error) {
|
||||
err = client.Call(method, *key, reply)
|
||||
return reply, err
|
||||
}, key.GetAccountKey())
|
||||
@@ -318,7 +317,7 @@ func (rs *Responder) callMethod(key *CallDescriptor, method string) (reply float
|
||||
Logger.Info("Waiting for raters to register...")
|
||||
time.Sleep(1 * time.Second) // wait one second and retry
|
||||
} else {
|
||||
reply, err = AccLock.Guard(func() (float64, error) {
|
||||
_, err = AccLock.Guard(func() (interface{}, error) {
|
||||
err = client.Call(method, *key, &reply)
|
||||
return reply, err
|
||||
}, key.GetAccountKey())
|
||||
|
||||
@@ -108,7 +108,7 @@ func (s *Scheduler) LoadActionTimings(storage engine.AccountingStorage) {
|
||||
newAts = append(newAts, at)
|
||||
}
|
||||
if toBeSaved {
|
||||
engine.AccLock.Guard(func() (float64, error) {
|
||||
engine.AccLock.Guard(func() (interface{}, error) {
|
||||
storage.SetActionTimings(key, newAts)
|
||||
return 0, nil
|
||||
}, engine.ACTION_TIMING_PREFIX)
|
||||
|
||||
Reference in New Issue
Block a user