diff --git a/engine/responder.go b/engine/responder.go index 049f53ec1..d7e9652ea 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -533,8 +533,9 @@ func (rs *Responder) UnRegisterRater(clientAddress string, replay *int) error { return nil } -func (rs *Responder) GetTimeout() time.Duration { - return rs.Timeout +func (rs *Responder) GetTimeout(i int, d *time.Duration) error { + *d = rs.Timeout + return nil } // Reflection worker type for not standalone balancer @@ -585,7 +586,7 @@ type Connector interface { ProcessCdr(*StoredCdr, *string) error LogCallCost(*CallCostLog, *string) error GetLCR(*AttrGetLcr, *LCRCost) error - GetTimeout() time.Duration + GetTimeout(int, *time.Duration) error } type RPCClientConnector struct { @@ -637,8 +638,9 @@ func (rcc *RPCClientConnector) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error { return rcc.Client.Call("Responder.GetLCR", attrs, reply) } -func (rcc *RPCClientConnector) GetTimeout() time.Duration { - return rcc.Timeout +func (rcc *RPCClientConnector) GetTimeout(i int, d *time.Duration) error { + *d = rcc.Timeout + return nil } type ConnectorPool []Connector @@ -647,12 +649,16 @@ func (cp ConnectorPool) GetCost(cd *CallDescriptor, cc *CallCost) error { for _, con := range cp { c := make(chan error, 1) callCost := &CallCost{} + + var timeout time.Duration + con.GetTimeout(0, &timeout) + go func() { c <- con.GetCost(cd, callCost) }() select { case err := <-c: *cc = *callCost return err - case <-time.After(con.GetTimeout()): + case <-time.After(timeout): // call timed out, continue } } @@ -663,12 +669,16 @@ func (cp ConnectorPool) Debit(cd *CallDescriptor, cc *CallCost) error { for _, con := range cp { c := make(chan error, 1) callCost := &CallCost{} + + var timeout time.Duration + con.GetTimeout(0, &timeout) + go func() { c <- con.Debit(cd, callCost) }() select { case err := <-c: *cc = *callCost return err - case <-time.After(con.GetTimeout()): + case <-time.After(timeout): // call timed out, continue } } @@ -679,12 +689,16 @@ func (cp ConnectorPool) MaxDebit(cd *CallDescriptor, cc *CallCost) error { for _, con := range cp { c := make(chan error, 1) callCost := &CallCost{} + + var timeout time.Duration + con.GetTimeout(0, &timeout) + go func() { c <- con.MaxDebit(cd, callCost) }() select { case err := <-c: *cc = *callCost return err - case <-time.After(con.GetTimeout()): + case <-time.After(timeout): // call timed out, continue } } @@ -695,12 +709,16 @@ func (cp ConnectorPool) RefundIncrements(cd *CallDescriptor, resp *float64) erro for _, con := range cp { c := make(chan error, 1) var r float64 + + var timeout time.Duration + con.GetTimeout(0, &timeout) + go func() { c <- con.RefundIncrements(cd, &r) }() select { case err := <-c: *resp = r return err - case <-time.After(con.GetTimeout()): + case <-time.After(timeout): // call timed out, continue } } @@ -711,12 +729,16 @@ func (cp ConnectorPool) GetMaxSessionTime(cd *CallDescriptor, resp *float64) err for _, con := range cp { c := make(chan error, 1) var r float64 + + var timeout time.Duration + con.GetTimeout(0, &timeout) + go func() { c <- con.GetMaxSessionTime(cd, &r) }() select { case err := <-c: *resp = r return err - case <-time.After(con.GetTimeout()): + case <-time.After(timeout): // call timed out, continue } } @@ -727,12 +749,16 @@ func (cp ConnectorPool) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) for _, con := range cp { c := make(chan error, 1) var r float64 + + var timeout time.Duration + con.GetTimeout(0, &timeout) + go func() { c <- con.GetDerivedMaxSessionTime(ev, &r) }() select { case err := <-c: *reply = r return err - case <-time.After(con.GetTimeout()): + case <-time.After(timeout): // call timed out, continue } } @@ -743,12 +769,16 @@ func (cp ConnectorPool) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) erro for _, con := range cp { c := make(chan error, 1) sr := make([]*SessionRun, 0) + + var timeout time.Duration + con.GetTimeout(0, &timeout) + go func() { c <- con.GetSessionRuns(ev, &sr) }() select { case err := <-c: *sRuns = sr return err - case <-time.After(con.GetTimeout()): + case <-time.After(timeout): // call timed out, continue } } @@ -759,12 +789,16 @@ func (cp ConnectorPool) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs for _, con := range cp { c := make(chan error, 1) derivedChargers := utils.DerivedChargers{} + + var timeout time.Duration + con.GetTimeout(0, &timeout) + go func() { c <- con.GetDerivedChargers(attrs, &derivedChargers) }() select { case err := <-c: *dcs = derivedChargers return err - case <-time.After(con.GetTimeout()): + case <-time.After(timeout): // call timed out, continue } } @@ -775,12 +809,16 @@ func (cp ConnectorPool) ProcessCdr(cdr *StoredCdr, reply *string) error { for _, con := range cp { c := make(chan error, 1) var r string + + var timeout time.Duration + con.GetTimeout(0, &timeout) + go func() { c <- con.ProcessCdr(cdr, &r) }() select { case err := <-c: *reply = r return err - case <-time.After(con.GetTimeout()): + case <-time.After(timeout): // call timed out, continue } } @@ -791,12 +829,16 @@ func (cp ConnectorPool) LogCallCost(ccl *CallCostLog, reply *string) error { for _, con := range cp { c := make(chan error, 1) var r string + + var timeout time.Duration + con.GetTimeout(0, &timeout) + go func() { c <- con.LogCallCost(ccl, &r) }() select { case err := <-c: *reply = r return err - case <-time.After(con.GetTimeout()): + case <-time.After(timeout): // call timed out, continue } } @@ -807,18 +849,23 @@ func (cp ConnectorPool) GetLCR(attr *AttrGetLcr, reply *LCRCost) error { for _, con := range cp { c := make(chan error, 1) lcrCost := &LCRCost{} + + var timeout time.Duration + con.GetTimeout(0, &timeout) + go func() { c <- con.GetLCR(attr, lcrCost) }() select { case err := <-c: *reply = *lcrCost return err - case <-time.After(con.GetTimeout()): + case <-time.After(timeout): // call timed out, continue } } return utils.ErrTimedOut } -func (cp ConnectorPool) GetTimeout() time.Duration { - return 0 +func (cp ConnectorPool) GetTimeout(i int, d *time.Duration) error { + *d = 0 + return nil } diff --git a/engine/tp_reader.go b/engine/tp_reader.go index c95df35d7..bf0b47ace 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -511,7 +511,6 @@ func (tpr *TpReader) LoadActions() (err error) { ExtraParameters: tpact.ExtraParameters, ExpirationString: tpact.ExpiryTime, Balance: &Balance{ - Uuid: utils.GenUUID(), Id: tpact.BalanceId, Value: tpact.Units, Weight: tpact.BalanceWeight, @@ -805,7 +804,6 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error ExtraParameters: tpact.ExtraParameters, ExpirationString: tpact.ExpiryTime, Balance: &Balance{ - Uuid: utils.GenUUID(), Value: tpact.Units, Weight: tpact.BalanceWeight, RatingSubject: tpact.RatingSubject, @@ -1007,7 +1005,6 @@ func (tpr *TpReader) LoadCdrStatsFiltered(tag string, save bool) (err error) { ExtraParameters: tpact.ExtraParameters, ExpirationString: tpact.ExpiryTime, Balance: &Balance{ - Uuid: utils.GenUUID(), Value: tpact.Units, Weight: tpact.BalanceWeight, RatingSubject: tpact.RatingSubject,