added years to interval and reflection client

This commit is contained in:
Radu Ioan Fericean
2012-07-24 17:51:43 +03:00
parent 2761fe68c2
commit ed0090b334
18 changed files with 204 additions and 98 deletions

View File

@@ -19,22 +19,26 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package balancer
import (
"net/rpc"
"sync"
"log"
"sync"
)
type Balancer struct {
sync.RWMutex
clients map[string]*rpc.Client
balancerChannel chan *rpc.Client
clients map[string]Worker
balancerChannel chan Worker
}
type Worker interface {
Call(serviceMethod string, args interface{}, reply interface{}) error
Close() error
}
/*
Constructor for RateList holding one slice for addreses and one slice for connections.
*/
func NewBalancer() *Balancer {
r := &Balancer{clients: make(map[string]*rpc.Client), balancerChannel: make(chan *rpc.Client)} // leaving both slices to nil
r := &Balancer{clients: make(map[string]Worker), balancerChannel: make(chan Worker)} // leaving both slices to nil
go func() {
for {
if len(r.clients) > 0 {
@@ -52,7 +56,7 @@ func NewBalancer() *Balancer {
/*
Adds a client to the two internal slices.
*/
func (bl *Balancer) AddClient(address string, client *rpc.Client) {
func (bl *Balancer) AddClient(address string, client Worker) {
bl.Lock()
defer bl.Unlock()
bl.clients[address] = client
@@ -72,7 +76,7 @@ func (bl *Balancer) RemoveClient(address string) {
/*
Returns a client for the specifed address.
*/
func (bl *Balancer) GetClient(address string) (c *rpc.Client, exists bool) {
func (bl *Balancer) GetClient(address string) (c Worker, exists bool) {
bl.RLock()
defer bl.RUnlock()
c, exists = bl.clients[address]
@@ -82,7 +86,7 @@ func (bl *Balancer) GetClient(address string) (c *rpc.Client, exists bool) {
/*
Returns the next available connection at each call looping at the end of connections.
*/
func (bl *Balancer) Balance() (result *rpc.Client) {
func (bl *Balancer) Balance() (result Worker) {
bl.RLock()
defer bl.RUnlock()
return <-bl.balancerChannel

View File

@@ -63,6 +63,7 @@ func NewRate(destinationsTag, connectFee, price, pricedUnits, rateIncrements str
}
type Timing struct {
Years timespans.Years
Months timespans.Months
MonthDays timespans.MonthDays
WeekDays timespans.WeekDays
@@ -71,15 +72,11 @@ type Timing struct {
func NewTiming(timeingInfo ...string) (rt *Timing) {
rt = &Timing{}
rt.Months.Parse(timeingInfo[0], ";")
rt.MonthDays.Parse(timeingInfo[1], ";")
rt.WeekDays.Parse(timeingInfo[2], ";")
if timeingInfo[3] == "*now" {
timeTokens := strings.Split(time.Now().Format(time.Stamp), " ")
rt.StartTime = timeTokens[len(timeTokens)-1]
} else {
rt.StartTime = timeingInfo[3]
}
rt.Years.Parse(timeingInfo[0], ";")
rt.Months.Parse(timeingInfo[1], ";")
rt.MonthDays.Parse(timeingInfo[2], ";")
rt.WeekDays.Parse(timeingInfo[3], ";")
rt.StartTime = timeingInfo[4]
return
}
@@ -105,6 +102,7 @@ func NewRateTiming(ratesTag string, timing *Timing, weight string) (rt *RateTimi
func (rt *RateTiming) GetInterval(r *Rate) (i *timespans.Interval) {
i = &timespans.Interval{
Years: rt.timing.Years,
Months: rt.timing.Months,
MonthDays: rt.timing.MonthDays,
WeekDays: rt.timing.WeekDays,

View File

@@ -37,6 +37,7 @@ var (
redis_server = "127.0.0.1:6379" // redis address host:port
redis_db = 10 // redis database number
rater_enabled = false // start standalone server (no balancer)
rater_standalone = false // start standalone server (no balancer)
rater_balancer_server = "127.0.0.1:2000" // balancer address host:port
rater_listen = "127.0.0.1:1234" // listening address host:port
@@ -79,6 +80,7 @@ func readConfig(configFn string) {
redis_server, _ = c.GetString("global", "redis_server")
redis_db, _ = c.GetInt("global", "redis_db")
rater_enabled, _ = c.GetBool("rater", "enabled")
rater_standalone, _ = c.GetBool("rater", "standalone")
rater_balancer_server, _ = c.GetString("rater", "balancer_server")
rater_listen, _ = c.GetString("rater", "listen_api")
@@ -152,7 +154,10 @@ func main() {
flag.Parse()
runtime.GOMAXPROCS(runtime.NumCPU())
readConfig(*config)
if balancer_standalone {
// some consitency checks
if balancer_standalone || balancer_enabled {
balancer_enabled = true
rater_enabled = false
rater_standalone = false
}
getter, err := timespans.NewRedisStorage(redis_server, redis_db)
@@ -163,21 +168,23 @@ func main() {
defer getter.Close()
timespans.SetStorageGetter(getter)
if !rater_standalone && !balancer_enabled {
if rater_enabled && !rater_standalone && !balancer_enabled {
go registerToBalancer()
go stopRaterSingnalHandler()
}
responder := &timespans.Responder{Bal: bal, ExitChan: exitChan}
if !balancer_enabled {
responder.Rpc = false
responder := &timespans.Responder{ExitChan: exitChan}
if rater_enabled {
go listenToRPCRequests(responder, rater_listen, rater_json)
}
if balancer_enabled {
go stopBalancerSingnalHandler()
go listenToRPCRequests(new(RaterServer), balancer_listen_rater, false)
responder.Rpc = true
responder.Bal = bal
go listenToRPCRequests(responder, balancer_listen_api, balancer_json)
go listenToHttpRequests()
if !balancer_standalone {
bal.AddClient("local", &timespans.ResponderWorker{&timespans.Responder{ExitChan: exitChan}})
}
}
if scheduler_enabled {

View File

@@ -28,6 +28,7 @@ func TestConfig(t *testing.T) {
if redis_server != "test" ||
redis_db != 1 ||
rater_enabled != true ||
rater_standalone != true ||
rater_balancer_server != "test" ||
rater_listen != "test" ||
@@ -57,6 +58,7 @@ func TestConfig(t *testing.T) {
mediator_password != "test" {
t.Log(redis_server)
t.Log(redis_db)
t.Log(rater_enabled)
t.Log(rater_standalone)
t.Log(rater_balancer_server)
t.Log(rater_json)
@@ -65,7 +67,6 @@ func TestConfig(t *testing.T) {
t.Log(balancer_listen_rater)
t.Log(balancer_listen_api)
t.Log(scheduler_enabled)
t.Log(scheduler_json)
t.Log(sm_enabled)
t.Log(sm_api_server)
t.Log(sm_freeswitch_server)

View File

@@ -70,11 +70,7 @@ func loadActionTimings(storage timespans.StorageGetter) {
// recreate the queue
s.queue = timespans.ActionTimingPriotityList{}
for _, at := range actionTimings {
if at.IsOneTimeRun() {
log.Print("Executing: ", at)
go at.Execute()
continue
}
at.CheckForASAP()
s.queue = append(s.queue, at)
}
sort.Sort(s.queue)

View File

@@ -18,16 +18,10 @@
redis_server = 127.0.0.1:6379 #redis address host:port
redis_db = 10 # redis database number
[rater]
standalone = true # start standalone server (no balancer)
balancer_server = 127.0.0.1:2000 # balancer address host:port
listen_api = 127.0.0.1:1234 # listening address host:port
json = false # use JSON for RPC encoding
[balancer]
enabled = false
enabled = true
standalone = false # run standalone
listen_rater = 127.0.0.1:2000 # Rater server address
listen_api = 127.0.0.1:2001 # Json RPC server address
listen_api = 127.0.0.1:2001 # RPC server address
web_status_server = 127.0.0.1:8000 # Web server address (for status)
json = false # use JSON for RPC encoding

View File

@@ -19,6 +19,7 @@ redis_server = 127.0.0.1:6379 #redis address host:port
redis_db = 10 # redis database number
[rater]
enabled = false
standalone = false # start standalone server (no balancer)
balancer_server = 127.0.0.1:2000 # balancer address host:port
listen_api = 127.0.0.1:1234 # listening address host:port

View File

@@ -19,6 +19,7 @@ redis_server = 127.0.0.1:6379 #redis address host:port
redis_db = 10 # redis database number
[rater]
enabled = true
standalone = true # start standalone server (no balancer)
balancer_server = 127.0.0.1:2000 # balancer address host:port
listen_api = 127.0.0.1:1234 # listening address host:port

View File

@@ -19,6 +19,7 @@ redis_server = 127.0.0.1:6379 #redis address host:port
redis_db = 10 # redis database number
[rater]
enabled = true
standalone = false # start standalone server (no balancer)
balancer_server = 127.0.0.1:2000 # balancer address host:port
listen_api = 127.0.0.1:1234 # listening address host:port

View File

@@ -19,6 +19,7 @@ redis_server = 127.0.0.1:6379 #redis address host:port
redis_db = 10 # redis database number
[rater]
enabled = true
standalone = true # start standalone server (no balancer)
listen_api = 127.0.0.1:1234 # listening address host:port
json = false # use JSON for RPC encoding

View File

@@ -19,6 +19,7 @@ redis_server = 127.0.0.1:6379 #redis address host:port
redis_db = 10 # redis database number
[rater]
enabled = true
standalone = true # start standalone server (no balancer)
listen_api = 127.0.0.1:1234 # listening address host:port
json = false # use JSON for RPC encoding

View File

@@ -19,6 +19,7 @@ redis_server = 127.0.0.1:6379 #redis address host:port
redis_db = 10 # redis database number
[rater]
enabled = true
standalone = true # start standalone server (no balancer)
balancer_server = 127.0.0.1:2000 # balancer address host:port
listen_api = 127.0.0.1:1234 # listening address host:port

View File

@@ -1,9 +1,11 @@
TimingTag,Months,MonthDays,WeekDays,StartTime
WORKDAYS_00,*all,*all,1;2;3;4;5,00:00:00
WORKDAYS_18,*all,*all,1;2;3;4;5,18:00:00
WEEKENDS,*all,*all,6;7,00:00:00
WEEKLY_SAME_TIME,*all,*all,1,*now
FIRST_DAY_OF_MONTH,*all,1,*all,00:00:00
DAILY_SAME_TIME,*all,*all,*all,*now
ONE_TIME_RUN,*none,*none,*none,*now
WINTER_FIRST_DAYS,1;2;12,1,*none,00:00:00
Tag,Years,Months,MonthDays,WeekDays,Time
WORKDAYS_00,*all,*all,*all,1;2;3;4;5,00:00:00
WORKDAYS_18,*all,*all,*all,1;2;3;4;5,18:00:00
WEEKENDS,*all,*all,*all,6;7,00:00:00
WEEKLY_SAME_TIME,*all,*all,*all,1,*asap
FIRST_DAY_OF_MONTH,*all,*all,1,*all,00:00:00
DAILY_SAME_TIME,*all,*all,*all,*all,*asap
ONE_TIME_RUN,2012,*none,*none,*none,*asap
WINTER_FIRST_DAYS,*all,1;2;12,1,*none,00:00:00
SAT_00,*all,*all,*all,6,00:00:00
SUN_13,*all,*all,*all,7,13:00:00
1 TimingTag Tag Years Months MonthDays StartTime WeekDays Time
2 WORKDAYS_00 *all *all *all 00:00:00 1;2;3;4;5 00:00:00
3 WORKDAYS_18 *all *all *all 18:00:00 1;2;3;4;5 18:00:00
4 WEEKENDS *all *all *all 00:00:00 6;7 00:00:00
5 WEEKLY_SAME_TIME *all *all *all *now 1 *asap
6 FIRST_DAY_OF_MONTH *all *all 1 00:00:00 *all 00:00:00
7 DAILY_SAME_TIME *all *all *all *now *all *asap
8 ONE_TIME_RUN 2012 *none *none *now *none *asap
9 WINTER_FIRST_DAYS *all 1;2;12 1 00:00:00 *none 00:00:00
10 SAT_00 *all *all *all 6 00:00:00
11 SUN_13 *all *all *all 7 13:00:00

View File

@@ -19,6 +19,7 @@ redis_server = test #redis address host:port
redis_db = 1 # redis database number
[rater]
enabled = true
standalone = true # start standalone server (no balancer)
balancer_server = test # balancer address host:port
listen_api = test # listening address host:port
@@ -30,18 +31,15 @@ standalone = true # run standalone
listen_rater = test # Rater server address
listen_api = test # Json RPC server address
web_status_server = test # Web server address (for status)
json = true # use JSON for RPC encoding
[scheduler]
enabled = true
json = true # use JSON for RPC encoding
[session_manager]
enabled = true
api_server = test # balancer address host:port
freeswitch_server = test # freeswitch address host:port
freeswitch_pass = test # freeswitch address host:port
json = true # use JSON for RPC encoding
[mediator]
enabled = true

View File

@@ -31,6 +31,7 @@ import (
const (
FORMAT = "2006-1-2 15:04:05 MST"
ASAP = "*asap"
)
type ActionTiming struct {
@@ -175,12 +176,12 @@ func (at *ActionTiming) Execute() (err error) {
return
}
func (at *ActionTiming) IsOneTimeRun() bool {
i := at.Timing
if i == nil {
return true
func (at *ActionTiming) CheckForASAP() {
if at.Timing.StartTime == ASAP {
oneMinute, _ := time.ParseDuration("1m")
timeTokens := strings.Split(time.Now().Add(oneMinute).Format(time.Stamp), " ")
at.Timing.StartTime = timeTokens[len(timeTokens)-1]
}
return len(i.Months) == 0 && len(i.MonthDays) == 0 && len(i.WeekDays) == 0
}
// Structure to store actions according to weight

View File

@@ -27,6 +27,69 @@ import (
// "log"
)
// Defines years days series
type Years []int
func (ys Years) Sort() {
sort.Sort(ys)
}
func (ys Years) Len() int {
return len(ys)
}
func (ys Years) Swap(i, j int) {
ys[i], ys[j] = ys[j], ys[i]
}
func (ys Years) Less(j, i int) bool {
return ys[j] < ys[i]
}
// Return true if the specified date is inside the series
func (ys Years) Contains(year int) (result bool) {
result = false
for _, yss := range ys {
if yss == year {
result = true
break
}
}
return
}
// Parse MonthDay elements from string separated by sep.
func (ys *Years) Parse(input, sep string) {
switch input {
case "*all", "*none":
*ys = []int{}
default:
elements := strings.Split(input, sep)
for _, yss := range elements {
if year, err := strconv.Atoi(yss); err == nil {
*ys = append(*ys, year)
}
}
}
}
func (yss Years) store() (result string) {
for _, ys := range yss {
result += strconv.Itoa(int(ys)) + ","
}
result = strings.TrimRight(result, ",")
return
}
func (yss *Years) restore(input string) {
for _, ys := range strings.Split(input, ",") {
if ys != "" {
mm, _ := strconv.Atoi(ys)
*yss = append(*yss, mm)
}
}
}
// Defines months series
type Months []time.Month

View File

@@ -31,6 +31,7 @@ import (
Defines a time interval for which a certain set of prices will apply
*/
type Interval struct {
Years Years
Months Months
MonthDays MonthDays
WeekDays WeekDays
@@ -42,6 +43,10 @@ type Interval struct {
Returns true if the received time result inside the interval
*/
func (i *Interval) Contains(t time.Time) bool {
// check for years
if len(i.Years) > 0 && !i.Years.Contains(t.Year()) {
return false
}
// check for months
if len(i.Months) > 0 && !i.Months.Contains(t.Month()) {
return false
@@ -116,11 +121,12 @@ func (i *Interval) getLeftMargin(t time.Time) (rigthtTime time.Time) {
}
func (i *Interval) String() string {
return fmt.Sprintf("%v %v %v %v %v", i.Months, i.MonthDays, i.WeekDays, i.StartTime, i.EndTime)
return fmt.Sprintf("%v %v %v %v %v %v", i.Years, i.Months, i.MonthDays, i.WeekDays, i.StartTime, i.EndTime)
}
func (i *Interval) Equal(o *Interval) bool {
return reflect.DeepEqual(i.Months, o.Months) &&
return reflect.DeepEqual(i.Years, o.Years) &&
reflect.DeepEqual(i.Months, o.Months) &&
reflect.DeepEqual(i.MonthDays, o.MonthDays) &&
reflect.DeepEqual(i.WeekDays, o.WeekDays) &&
i.StartTime == o.StartTime &&
@@ -131,6 +137,7 @@ func (i *Interval) Equal(o *Interval) bool {
Serializes the intervals for the storag. Used for key-value storages.
*/
func (i *Interval) store() (result string) {
result += i.Years.store() + ";"
result += i.Months.store() + ";"
result += i.MonthDays.store() + ";"
result += i.WeekDays.store() + ";"
@@ -149,14 +156,15 @@ De-serializes the interval for the storage. Used for key-value storages.
*/
func (i *Interval) restore(input string) {
is := strings.Split(input, ";")
i.Months.restore(is[0])
i.MonthDays.restore(is[1])
i.WeekDays.restore(is[2])
i.StartTime = is[3]
i.EndTime = is[4]
i.Weight, _ = strconv.ParseFloat(is[5], 64)
i.ConnectFee, _ = strconv.ParseFloat(is[6], 64)
i.Price, _ = strconv.ParseFloat(is[7], 64)
i.PricedUnits, _ = strconv.ParseFloat(is[8], 64)
i.RateIncrements, _ = strconv.ParseFloat(is[9], 64)
i.Years.restore(is[0])
i.Months.restore(is[1])
i.MonthDays.restore(is[2])
i.WeekDays.restore(is[3])
i.StartTime = is[4]
i.EndTime = is[5]
i.Weight, _ = strconv.ParseFloat(is[6], 64)
i.ConnectFee, _ = strconv.ParseFloat(is[7], 64)
i.Price, _ = strconv.ParseFloat(is[8], 64)
i.PricedUnits, _ = strconv.ParseFloat(is[9], 64)
i.RateIncrements, _ = strconv.ParseFloat(is[10], 64)
}

View File

@@ -23,12 +23,13 @@ import (
"fmt"
"github.com/cgrates/cgrates/balancer"
"log"
"reflect"
"runtime"
"strings"
"time"
)
type Responder struct {
Rpc bool
Bal *balancer.Balancer
ExitChan chan bool
}
@@ -36,12 +37,15 @@ type Responder struct {
/*
RPC method thet provides the external RPC interface for getting the rating information.
*/
func (r *Responder) GetCost(arg CallDescriptor, reply *CallCost) (err error) {
if r.Rpc {
r, e := r.getCallCost(&arg, "Responder.GetCost")
func (rs *Responder) GetCost(arg CallDescriptor, reply *CallCost) (err error) {
log.Print("Ready: ", rs.Bal)
if rs.Bal != nil {
r, e := rs.getCallCost(&arg, "Responder.GetCost")
*reply, err = *r, e
} else {
log.Print("1", arg.GetUserBalanceKey())
r, e := AccLock.GuardGetCost(arg.GetUserBalanceKey(), func() (*CallCost, error) {
log.Print("here", arg)
return (&arg).GetCost()
})
*reply, err = *r, e
@@ -49,9 +53,9 @@ func (r *Responder) GetCost(arg CallDescriptor, reply *CallCost) (err error) {
return
}
func (r *Responder) Debit(arg CallDescriptor, reply *CallCost) (err error) {
if r.Rpc {
r, e := r.getCallCost(&arg, "Responder.Debit")
func (rs *Responder) Debit(arg CallDescriptor, reply *CallCost) (err error) {
if rs.Bal != nil {
r, e := rs.getCallCost(&arg, "Responder.Debit")
*reply, err = *r, e
} else {
r, e := AccLock.GuardGetCost(arg.GetUserBalanceKey(), func() (*CallCost, error) {
@@ -62,9 +66,9 @@ func (r *Responder) Debit(arg CallDescriptor, reply *CallCost) (err error) {
return
}
func (r *Responder) DebitCents(arg CallDescriptor, reply *float64) (err error) {
if r.Rpc {
*reply, err = r.callMethod(&arg, "Responder.DebitCents")
func (rs *Responder) DebitCents(arg CallDescriptor, reply *float64) (err error) {
if rs.Bal != nil {
*reply, err = rs.callMethod(&arg, "Responder.DebitCents")
} else {
r, e := AccLock.Guard(arg.GetUserBalanceKey(), func() (float64, error) {
return (&arg).DebitCents()
@@ -74,9 +78,9 @@ func (r *Responder) DebitCents(arg CallDescriptor, reply *float64) (err error) {
return
}
func (r *Responder) DebitSMS(arg CallDescriptor, reply *float64) (err error) {
if r.Rpc {
*reply, err = r.callMethod(&arg, "Responder.DebitSMS")
func (rs *Responder) DebitSMS(arg CallDescriptor, reply *float64) (err error) {
if rs.Bal != nil {
*reply, err = rs.callMethod(&arg, "Responder.DebitSMS")
} else {
r, e := AccLock.Guard(arg.GetUserBalanceKey(), func() (float64, error) {
return (&arg).DebitSMS()
@@ -86,9 +90,9 @@ func (r *Responder) DebitSMS(arg CallDescriptor, reply *float64) (err error) {
return
}
func (r *Responder) DebitSeconds(arg CallDescriptor, reply *float64) (err error) {
if r.Rpc {
*reply, err = r.callMethod(&arg, "Responder.DebitSeconds")
func (rs *Responder) DebitSeconds(arg CallDescriptor, reply *float64) (err error) {
if rs.Bal != nil {
*reply, err = rs.callMethod(&arg, "Responder.DebitSeconds")
} else {
r, e := AccLock.Guard(arg.GetUserBalanceKey(), func() (float64, error) {
return 0, (&arg).DebitSeconds()
@@ -98,9 +102,9 @@ func (r *Responder) DebitSeconds(arg CallDescriptor, reply *float64) (err error)
return
}
func (r *Responder) GetMaxSessionTime(arg CallDescriptor, reply *float64) (err error) {
if r.Rpc {
*reply, err = r.callMethod(&arg, "Responder.GetMaxSessionTime")
func (rs *Responder) GetMaxSessionTime(arg CallDescriptor, reply *float64) (err error) {
if rs.Bal != nil {
*reply, err = rs.callMethod(&arg, "Responder.GetMaxSessionTime")
} else {
r, e := AccLock.Guard(arg.GetUserBalanceKey(), func() (float64, error) {
return (&arg).GetMaxSessionTime()
@@ -110,9 +114,9 @@ func (r *Responder) GetMaxSessionTime(arg CallDescriptor, reply *float64) (err e
return
}
func (r *Responder) AddRecievedCallSeconds(arg CallDescriptor, reply *float64) (err error) {
if r.Rpc {
*reply, err = r.callMethod(&arg, "Responder.AddRecievedCallSeconds")
func (rs *Responder) AddRecievedCallSeconds(arg CallDescriptor, reply *float64) (err error) {
if rs.Bal != nil {
*reply, err = rs.callMethod(&arg, "Responder.AddRecievedCallSeconds")
} else {
r, e := AccLock.Guard(arg.GetUserBalanceKey(), func() (float64, error) {
@@ -123,13 +127,13 @@ func (r *Responder) AddRecievedCallSeconds(arg CallDescriptor, reply *float64) (
return
}
func (r *Responder) Status(arg CallDescriptor, reply *string) (err error) {
func (rs *Responder) Status(arg CallDescriptor, reply *string) (err error) {
memstats := new(runtime.MemStats)
runtime.ReadMemStats(memstats)
if r.Rpc {
if rs.Bal != nil {
*reply = "Connected raters:\n"
for _, rater := range r.Bal.GetClientAddresses() {
for _, rater := range rs.Bal.GetClientAddresses() {
log.Print(rater)
*reply += fmt.Sprintf("%v\n", rater)
}
@@ -140,10 +144,12 @@ func (r *Responder) Status(arg CallDescriptor, reply *string) (err error) {
return
}
func (r *Responder) Shutdown(arg string, reply *string) (err error) {
r.Bal.Shutdown()
func (rs *Responder) Shutdown(arg string, reply *string) (err error) {
if rs.Bal != nil {
rs.Bal.Shutdown()
}
storageGetter.Close()
defer func() { r.ExitChan <- true }()
defer func() { rs.ExitChan <- true }()
*reply = "Done!"
return
}
@@ -151,10 +157,10 @@ func (r *Responder) Shutdown(arg string, reply *string) (err error) {
/*
The function that gets the information from the raters using balancer.
*/
func (r *Responder) getCallCost(key *CallDescriptor, method string) (reply *CallCost, err error) {
func (rs *Responder) getCallCost(key *CallDescriptor, method string) (reply *CallCost, err error) {
err = errors.New("") //not nil value
for err != nil {
client := r.Bal.Balance()
client := rs.Bal.Balance()
if client == nil {
log.Print("Waiting for raters to register...")
time.Sleep(1 * time.Second) // wait one second and retry
@@ -175,10 +181,10 @@ func (r *Responder) getCallCost(key *CallDescriptor, method string) (reply *Call
/*
The function that gets the information from the raters using balancer.
*/
func (r *Responder) callMethod(key *CallDescriptor, method string) (reply float64, err error) {
func (rs *Responder) callMethod(key *CallDescriptor, method string) (reply float64, err error) {
err = errors.New("") //not nil value
for err != nil {
client := r.Bal.Balance()
client := rs.Bal.Balance()
if client == nil {
log.Print("Waiting for raters to register...")
time.Sleep(1 * time.Second) // wait one second and retry
@@ -194,3 +200,25 @@ func (r *Responder) callMethod(key *CallDescriptor, method string) (reply float6
}
return
}
type ResponderWorker struct {
R *Responder
}
func (rw *ResponderWorker) Call(serviceMethod string, args interface{}, reply interface{}) error {
log.Printf("Calling by reflection: %v %v", strings.TrimLeft(serviceMethod, "Responder."), reply)
method := reflect.ValueOf(rw.R).MethodByName(strings.TrimLeft(serviceMethod, "Responder."))
r := method.Call([]reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)})
response := r[0].Interface()
log.Printf("Called: %v %v", reply, response)
if response == nil {
return nil
}
return response.(error)
}
func (rw *ResponderWorker) Close() error {
return nil
}