fix duplicate balance uuid issue

This commit is contained in:
Radu Ioan Fericean
2015-09-16 18:18:12 +03:00
parent 3b9ff9ea7e
commit 55aa957691
2 changed files with 65 additions and 21 deletions

View File

@@ -533,8 +533,9 @@ func (rs *Responder) UnRegisterRater(clientAddress string, replay *int) error {
return nil
}
func (rs *Responder) GetTimeout() time.Duration {
return rs.Timeout
func (rs *Responder) GetTimeout(i int, d *time.Duration) error {
*d = rs.Timeout
return nil
}
// Reflection worker type for not standalone balancer
@@ -585,7 +586,7 @@ type Connector interface {
ProcessCdr(*StoredCdr, *string) error
LogCallCost(*CallCostLog, *string) error
GetLCR(*AttrGetLcr, *LCRCost) error
GetTimeout() time.Duration
GetTimeout(int, *time.Duration) error
}
type RPCClientConnector struct {
@@ -637,8 +638,9 @@ func (rcc *RPCClientConnector) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error {
return rcc.Client.Call("Responder.GetLCR", attrs, reply)
}
func (rcc *RPCClientConnector) GetTimeout() time.Duration {
return rcc.Timeout
func (rcc *RPCClientConnector) GetTimeout(i int, d *time.Duration) error {
*d = rcc.Timeout
return nil
}
type ConnectorPool []Connector
@@ -647,12 +649,16 @@ func (cp ConnectorPool) GetCost(cd *CallDescriptor, cc *CallCost) error {
for _, con := range cp {
c := make(chan error, 1)
callCost := &CallCost{}
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.GetCost(cd, callCost) }()
select {
case err := <-c:
*cc = *callCost
return err
case <-time.After(con.GetTimeout()):
case <-time.After(timeout):
// call timed out, continue
}
}
@@ -663,12 +669,16 @@ func (cp ConnectorPool) Debit(cd *CallDescriptor, cc *CallCost) error {
for _, con := range cp {
c := make(chan error, 1)
callCost := &CallCost{}
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.Debit(cd, callCost) }()
select {
case err := <-c:
*cc = *callCost
return err
case <-time.After(con.GetTimeout()):
case <-time.After(timeout):
// call timed out, continue
}
}
@@ -679,12 +689,16 @@ func (cp ConnectorPool) MaxDebit(cd *CallDescriptor, cc *CallCost) error {
for _, con := range cp {
c := make(chan error, 1)
callCost := &CallCost{}
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.MaxDebit(cd, callCost) }()
select {
case err := <-c:
*cc = *callCost
return err
case <-time.After(con.GetTimeout()):
case <-time.After(timeout):
// call timed out, continue
}
}
@@ -695,12 +709,16 @@ func (cp ConnectorPool) RefundIncrements(cd *CallDescriptor, resp *float64) erro
for _, con := range cp {
c := make(chan error, 1)
var r float64
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.RefundIncrements(cd, &r) }()
select {
case err := <-c:
*resp = r
return err
case <-time.After(con.GetTimeout()):
case <-time.After(timeout):
// call timed out, continue
}
}
@@ -711,12 +729,16 @@ func (cp ConnectorPool) GetMaxSessionTime(cd *CallDescriptor, resp *float64) err
for _, con := range cp {
c := make(chan error, 1)
var r float64
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.GetMaxSessionTime(cd, &r) }()
select {
case err := <-c:
*resp = r
return err
case <-time.After(con.GetTimeout()):
case <-time.After(timeout):
// call timed out, continue
}
}
@@ -727,12 +749,16 @@ func (cp ConnectorPool) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64)
for _, con := range cp {
c := make(chan error, 1)
var r float64
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.GetDerivedMaxSessionTime(ev, &r) }()
select {
case err := <-c:
*reply = r
return err
case <-time.After(con.GetTimeout()):
case <-time.After(timeout):
// call timed out, continue
}
}
@@ -743,12 +769,16 @@ func (cp ConnectorPool) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) erro
for _, con := range cp {
c := make(chan error, 1)
sr := make([]*SessionRun, 0)
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.GetSessionRuns(ev, &sr) }()
select {
case err := <-c:
*sRuns = sr
return err
case <-time.After(con.GetTimeout()):
case <-time.After(timeout):
// call timed out, continue
}
}
@@ -759,12 +789,16 @@ func (cp ConnectorPool) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs
for _, con := range cp {
c := make(chan error, 1)
derivedChargers := utils.DerivedChargers{}
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.GetDerivedChargers(attrs, &derivedChargers) }()
select {
case err := <-c:
*dcs = derivedChargers
return err
case <-time.After(con.GetTimeout()):
case <-time.After(timeout):
// call timed out, continue
}
}
@@ -775,12 +809,16 @@ func (cp ConnectorPool) ProcessCdr(cdr *StoredCdr, reply *string) error {
for _, con := range cp {
c := make(chan error, 1)
var r string
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.ProcessCdr(cdr, &r) }()
select {
case err := <-c:
*reply = r
return err
case <-time.After(con.GetTimeout()):
case <-time.After(timeout):
// call timed out, continue
}
}
@@ -791,12 +829,16 @@ func (cp ConnectorPool) LogCallCost(ccl *CallCostLog, reply *string) error {
for _, con := range cp {
c := make(chan error, 1)
var r string
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.LogCallCost(ccl, &r) }()
select {
case err := <-c:
*reply = r
return err
case <-time.After(con.GetTimeout()):
case <-time.After(timeout):
// call timed out, continue
}
}
@@ -807,18 +849,23 @@ func (cp ConnectorPool) GetLCR(attr *AttrGetLcr, reply *LCRCost) error {
for _, con := range cp {
c := make(chan error, 1)
lcrCost := &LCRCost{}
var timeout time.Duration
con.GetTimeout(0, &timeout)
go func() { c <- con.GetLCR(attr, lcrCost) }()
select {
case err := <-c:
*reply = *lcrCost
return err
case <-time.After(con.GetTimeout()):
case <-time.After(timeout):
// call timed out, continue
}
}
return utils.ErrTimedOut
}
func (cp ConnectorPool) GetTimeout() time.Duration {
return 0
func (cp ConnectorPool) GetTimeout(i int, d *time.Duration) error {
*d = 0
return nil
}

View File

@@ -511,7 +511,6 @@ func (tpr *TpReader) LoadActions() (err error) {
ExtraParameters: tpact.ExtraParameters,
ExpirationString: tpact.ExpiryTime,
Balance: &Balance{
Uuid: utils.GenUUID(),
Id: tpact.BalanceId,
Value: tpact.Units,
Weight: tpact.BalanceWeight,
@@ -805,7 +804,6 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error
ExtraParameters: tpact.ExtraParameters,
ExpirationString: tpact.ExpiryTime,
Balance: &Balance{
Uuid: utils.GenUUID(),
Value: tpact.Units,
Weight: tpact.BalanceWeight,
RatingSubject: tpact.RatingSubject,
@@ -1007,7 +1005,6 @@ func (tpr *TpReader) LoadCdrStatsFiltered(tag string, save bool) (err error) {
ExtraParameters: tpact.ExtraParameters,
ExpirationString: tpact.ExpiryTime,
Balance: &Balance{
Uuid: utils.GenUUID(),
Value: tpact.Units,
Weight: tpact.BalanceWeight,
RatingSubject: tpact.RatingSubject,