From ed0090b3343ffa7ee57c3502f4f1302c23fcc6d4 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 24 Jul 2012 17:51:43 +0300 Subject: [PATCH] added years to interval and reflection client --- balancer/balancer.go | 20 ++++---- cmd/cgr-loader/helpers.go | 16 +++---- cmd/cgr-rater/cgr-rater.go | 19 +++++--- cmd/cgr-rater/rater_test.go | 3 +- cmd/cgr-rater/scheduler.go | 6 +-- conf/balancer.config | 10 +--- conf/full.config | 1 + conf/mediator.config | 1 + conf/rater.config | 1 + conf/rater_standalone.config | 1 + conf/scheduler.config | 1 + conf/session_manager.config | 1 + data/Timings.csv | 20 ++++---- data/test.config | 4 +- timespans/action_timing.go | 11 +++-- timespans/dateseries.go | 63 ++++++++++++++++++++++++ timespans/interval.go | 32 ++++++++----- timespans/responder.go | 92 +++++++++++++++++++++++------------- 18 files changed, 204 insertions(+), 98 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index 5857156b9..55207e396 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -19,22 +19,26 @@ along with this program. If not, see package balancer import ( - "net/rpc" - "sync" "log" + "sync" ) type Balancer struct { sync.RWMutex - clients map[string]*rpc.Client - balancerChannel chan *rpc.Client + clients map[string]Worker + balancerChannel chan Worker +} + +type Worker interface { + Call(serviceMethod string, args interface{}, reply interface{}) error + Close() error } /* Constructor for RateList holding one slice for addreses and one slice for connections. */ func NewBalancer() *Balancer { - r := &Balancer{clients: make(map[string]*rpc.Client), balancerChannel: make(chan *rpc.Client)} // leaving both slices to nil + r := &Balancer{clients: make(map[string]Worker), balancerChannel: make(chan Worker)} // leaving both slices to nil go func() { for { if len(r.clients) > 0 { @@ -52,7 +56,7 @@ func NewBalancer() *Balancer { /* Adds a client to the two internal slices. */ -func (bl *Balancer) AddClient(address string, client *rpc.Client) { +func (bl *Balancer) AddClient(address string, client Worker) { bl.Lock() defer bl.Unlock() bl.clients[address] = client @@ -72,7 +76,7 @@ func (bl *Balancer) RemoveClient(address string) { /* Returns a client for the specifed address. */ -func (bl *Balancer) GetClient(address string) (c *rpc.Client, exists bool) { +func (bl *Balancer) GetClient(address string) (c Worker, exists bool) { bl.RLock() defer bl.RUnlock() c, exists = bl.clients[address] @@ -82,7 +86,7 @@ func (bl *Balancer) GetClient(address string) (c *rpc.Client, exists bool) { /* Returns the next available connection at each call looping at the end of connections. */ -func (bl *Balancer) Balance() (result *rpc.Client) { +func (bl *Balancer) Balance() (result Worker) { bl.RLock() defer bl.RUnlock() return <-bl.balancerChannel diff --git a/cmd/cgr-loader/helpers.go b/cmd/cgr-loader/helpers.go index 1f4bd87ed..6d6bd3287 100644 --- a/cmd/cgr-loader/helpers.go +++ b/cmd/cgr-loader/helpers.go @@ -63,6 +63,7 @@ func NewRate(destinationsTag, connectFee, price, pricedUnits, rateIncrements str } type Timing struct { + Years timespans.Years Months timespans.Months MonthDays timespans.MonthDays WeekDays timespans.WeekDays @@ -71,15 +72,11 @@ type Timing struct { func NewTiming(timeingInfo ...string) (rt *Timing) { rt = &Timing{} - rt.Months.Parse(timeingInfo[0], ";") - rt.MonthDays.Parse(timeingInfo[1], ";") - rt.WeekDays.Parse(timeingInfo[2], ";") - if timeingInfo[3] == "*now" { - timeTokens := strings.Split(time.Now().Format(time.Stamp), " ") - rt.StartTime = timeTokens[len(timeTokens)-1] - } else { - rt.StartTime = timeingInfo[3] - } + rt.Years.Parse(timeingInfo[0], ";") + rt.Months.Parse(timeingInfo[1], ";") + rt.MonthDays.Parse(timeingInfo[2], ";") + rt.WeekDays.Parse(timeingInfo[3], ";") + rt.StartTime = timeingInfo[4] return } @@ -105,6 +102,7 @@ func NewRateTiming(ratesTag string, timing *Timing, weight string) (rt *RateTimi func (rt *RateTiming) GetInterval(r *Rate) (i *timespans.Interval) { i = ×pans.Interval{ + Years: rt.timing.Years, Months: rt.timing.Months, MonthDays: rt.timing.MonthDays, WeekDays: rt.timing.WeekDays, diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index bd9686ae0..d352b6ef2 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -37,6 +37,7 @@ var ( redis_server = "127.0.0.1:6379" // redis address host:port redis_db = 10 // redis database number + rater_enabled = false // start standalone server (no balancer) rater_standalone = false // start standalone server (no balancer) rater_balancer_server = "127.0.0.1:2000" // balancer address host:port rater_listen = "127.0.0.1:1234" // listening address host:port @@ -79,6 +80,7 @@ func readConfig(configFn string) { redis_server, _ = c.GetString("global", "redis_server") redis_db, _ = c.GetInt("global", "redis_db") + rater_enabled, _ = c.GetBool("rater", "enabled") rater_standalone, _ = c.GetBool("rater", "standalone") rater_balancer_server, _ = c.GetString("rater", "balancer_server") rater_listen, _ = c.GetString("rater", "listen_api") @@ -152,7 +154,10 @@ func main() { flag.Parse() runtime.GOMAXPROCS(runtime.NumCPU()) readConfig(*config) - if balancer_standalone { + // some consitency checks + if balancer_standalone || balancer_enabled { + balancer_enabled = true + rater_enabled = false rater_standalone = false } getter, err := timespans.NewRedisStorage(redis_server, redis_db) @@ -163,21 +168,23 @@ func main() { defer getter.Close() timespans.SetStorageGetter(getter) - if !rater_standalone && !balancer_enabled { + if rater_enabled && !rater_standalone && !balancer_enabled { go registerToBalancer() go stopRaterSingnalHandler() } - responder := ×pans.Responder{Bal: bal, ExitChan: exitChan} - if !balancer_enabled { - responder.Rpc = false + responder := ×pans.Responder{ExitChan: exitChan} + if rater_enabled { go listenToRPCRequests(responder, rater_listen, rater_json) } if balancer_enabled { go stopBalancerSingnalHandler() go listenToRPCRequests(new(RaterServer), balancer_listen_rater, false) - responder.Rpc = true + responder.Bal = bal go listenToRPCRequests(responder, balancer_listen_api, balancer_json) go listenToHttpRequests() + if !balancer_standalone { + bal.AddClient("local", ×pans.ResponderWorker{×pans.Responder{ExitChan: exitChan}}) + } } if scheduler_enabled { diff --git a/cmd/cgr-rater/rater_test.go b/cmd/cgr-rater/rater_test.go index b44749acf..e1b6e9a97 100644 --- a/cmd/cgr-rater/rater_test.go +++ b/cmd/cgr-rater/rater_test.go @@ -28,6 +28,7 @@ func TestConfig(t *testing.T) { if redis_server != "test" || redis_db != 1 || + rater_enabled != true || rater_standalone != true || rater_balancer_server != "test" || rater_listen != "test" || @@ -57,6 +58,7 @@ func TestConfig(t *testing.T) { mediator_password != "test" { t.Log(redis_server) t.Log(redis_db) + t.Log(rater_enabled) t.Log(rater_standalone) t.Log(rater_balancer_server) t.Log(rater_json) @@ -65,7 +67,6 @@ func TestConfig(t *testing.T) { t.Log(balancer_listen_rater) t.Log(balancer_listen_api) t.Log(scheduler_enabled) - t.Log(scheduler_json) t.Log(sm_enabled) t.Log(sm_api_server) t.Log(sm_freeswitch_server) diff --git a/cmd/cgr-rater/scheduler.go b/cmd/cgr-rater/scheduler.go index 9e0284743..d78f401f0 100644 --- a/cmd/cgr-rater/scheduler.go +++ b/cmd/cgr-rater/scheduler.go @@ -70,11 +70,7 @@ func loadActionTimings(storage timespans.StorageGetter) { // recreate the queue s.queue = timespans.ActionTimingPriotityList{} for _, at := range actionTimings { - if at.IsOneTimeRun() { - log.Print("Executing: ", at) - go at.Execute() - continue - } + at.CheckForASAP() s.queue = append(s.queue, at) } sort.Sort(s.queue) diff --git a/conf/balancer.config b/conf/balancer.config index b70189010..3ea6ada69 100644 --- a/conf/balancer.config +++ b/conf/balancer.config @@ -18,16 +18,10 @@ redis_server = 127.0.0.1:6379 #redis address host:port redis_db = 10 # redis database number -[rater] -standalone = true # start standalone server (no balancer) -balancer_server = 127.0.0.1:2000 # balancer address host:port -listen_api = 127.0.0.1:1234 # listening address host:port -json = false # use JSON for RPC encoding - [balancer] -enabled = false +enabled = true standalone = false # run standalone listen_rater = 127.0.0.1:2000 # Rater server address -listen_api = 127.0.0.1:2001 # Json RPC server address +listen_api = 127.0.0.1:2001 # RPC server address web_status_server = 127.0.0.1:8000 # Web server address (for status) json = false # use JSON for RPC encoding diff --git a/conf/full.config b/conf/full.config index fe508c0ad..703f8d200 100644 --- a/conf/full.config +++ b/conf/full.config @@ -19,6 +19,7 @@ redis_server = 127.0.0.1:6379 #redis address host:port redis_db = 10 # redis database number [rater] +enabled = false standalone = false # start standalone server (no balancer) balancer_server = 127.0.0.1:2000 # balancer address host:port listen_api = 127.0.0.1:1234 # listening address host:port diff --git a/conf/mediator.config b/conf/mediator.config index 7a3e0612d..cd5c63e89 100644 --- a/conf/mediator.config +++ b/conf/mediator.config @@ -19,6 +19,7 @@ redis_server = 127.0.0.1:6379 #redis address host:port redis_db = 10 # redis database number [rater] +enabled = true standalone = true # start standalone server (no balancer) balancer_server = 127.0.0.1:2000 # balancer address host:port listen_api = 127.0.0.1:1234 # listening address host:port diff --git a/conf/rater.config b/conf/rater.config index 1c72f5af1..885c58ddf 100644 --- a/conf/rater.config +++ b/conf/rater.config @@ -19,6 +19,7 @@ redis_server = 127.0.0.1:6379 #redis address host:port redis_db = 10 # redis database number [rater] +enabled = true standalone = false # start standalone server (no balancer) balancer_server = 127.0.0.1:2000 # balancer address host:port listen_api = 127.0.0.1:1234 # listening address host:port diff --git a/conf/rater_standalone.config b/conf/rater_standalone.config index a112c6e6a..c976757d8 100644 --- a/conf/rater_standalone.config +++ b/conf/rater_standalone.config @@ -19,6 +19,7 @@ redis_server = 127.0.0.1:6379 #redis address host:port redis_db = 10 # redis database number [rater] +enabled = true standalone = true # start standalone server (no balancer) listen_api = 127.0.0.1:1234 # listening address host:port json = false # use JSON for RPC encoding diff --git a/conf/scheduler.config b/conf/scheduler.config index 03171561f..5ea0512ac 100644 --- a/conf/scheduler.config +++ b/conf/scheduler.config @@ -19,6 +19,7 @@ redis_server = 127.0.0.1:6379 #redis address host:port redis_db = 10 # redis database number [rater] +enabled = true standalone = true # start standalone server (no balancer) listen_api = 127.0.0.1:1234 # listening address host:port json = false # use JSON for RPC encoding diff --git a/conf/session_manager.config b/conf/session_manager.config index 2c0b902f0..b9252796c 100644 --- a/conf/session_manager.config +++ b/conf/session_manager.config @@ -19,6 +19,7 @@ redis_server = 127.0.0.1:6379 #redis address host:port redis_db = 10 # redis database number [rater] +enabled = true standalone = true # start standalone server (no balancer) balancer_server = 127.0.0.1:2000 # balancer address host:port listen_api = 127.0.0.1:1234 # listening address host:port diff --git a/data/Timings.csv b/data/Timings.csv index 888cbe5df..ae7df5c54 100644 --- a/data/Timings.csv +++ b/data/Timings.csv @@ -1,9 +1,11 @@ -TimingTag,Months,MonthDays,WeekDays,StartTime -WORKDAYS_00,*all,*all,1;2;3;4;5,00:00:00 -WORKDAYS_18,*all,*all,1;2;3;4;5,18:00:00 -WEEKENDS,*all,*all,6;7,00:00:00 -WEEKLY_SAME_TIME,*all,*all,1,*now -FIRST_DAY_OF_MONTH,*all,1,*all,00:00:00 -DAILY_SAME_TIME,*all,*all,*all,*now -ONE_TIME_RUN,*none,*none,*none,*now -WINTER_FIRST_DAYS,1;2;12,1,*none,00:00:00 \ No newline at end of file +Tag,Years,Months,MonthDays,WeekDays,Time +WORKDAYS_00,*all,*all,*all,1;2;3;4;5,00:00:00 +WORKDAYS_18,*all,*all,*all,1;2;3;4;5,18:00:00 +WEEKENDS,*all,*all,*all,6;7,00:00:00 +WEEKLY_SAME_TIME,*all,*all,*all,1,*asap +FIRST_DAY_OF_MONTH,*all,*all,1,*all,00:00:00 +DAILY_SAME_TIME,*all,*all,*all,*all,*asap +ONE_TIME_RUN,2012,*none,*none,*none,*asap +WINTER_FIRST_DAYS,*all,1;2;12,1,*none,00:00:00 +SAT_00,*all,*all,*all,6,00:00:00 +SUN_13,*all,*all,*all,7,13:00:00 \ No newline at end of file diff --git a/data/test.config b/data/test.config index 5f16b1f8b..72c1957ad 100644 --- a/data/test.config +++ b/data/test.config @@ -19,6 +19,7 @@ redis_server = test #redis address host:port redis_db = 1 # redis database number [rater] +enabled = true standalone = true # start standalone server (no balancer) balancer_server = test # balancer address host:port listen_api = test # listening address host:port @@ -30,18 +31,15 @@ standalone = true # run standalone listen_rater = test # Rater server address listen_api = test # Json RPC server address web_status_server = test # Web server address (for status) -json = true # use JSON for RPC encoding [scheduler] enabled = true -json = true # use JSON for RPC encoding [session_manager] enabled = true api_server = test # balancer address host:port freeswitch_server = test # freeswitch address host:port freeswitch_pass = test # freeswitch address host:port -json = true # use JSON for RPC encoding [mediator] enabled = true diff --git a/timespans/action_timing.go b/timespans/action_timing.go index 7a874fe9f..a17552cbf 100644 --- a/timespans/action_timing.go +++ b/timespans/action_timing.go @@ -31,6 +31,7 @@ import ( const ( FORMAT = "2006-1-2 15:04:05 MST" + ASAP = "*asap" ) type ActionTiming struct { @@ -175,12 +176,12 @@ func (at *ActionTiming) Execute() (err error) { return } -func (at *ActionTiming) IsOneTimeRun() bool { - i := at.Timing - if i == nil { - return true +func (at *ActionTiming) CheckForASAP() { + if at.Timing.StartTime == ASAP { + oneMinute, _ := time.ParseDuration("1m") + timeTokens := strings.Split(time.Now().Add(oneMinute).Format(time.Stamp), " ") + at.Timing.StartTime = timeTokens[len(timeTokens)-1] } - return len(i.Months) == 0 && len(i.MonthDays) == 0 && len(i.WeekDays) == 0 } // Structure to store actions according to weight diff --git a/timespans/dateseries.go b/timespans/dateseries.go index 2e664b4ae..cd80c7a67 100644 --- a/timespans/dateseries.go +++ b/timespans/dateseries.go @@ -27,6 +27,69 @@ import ( // "log" ) +// Defines years days series +type Years []int + +func (ys Years) Sort() { + sort.Sort(ys) +} + +func (ys Years) Len() int { + return len(ys) +} + +func (ys Years) Swap(i, j int) { + ys[i], ys[j] = ys[j], ys[i] +} + +func (ys Years) Less(j, i int) bool { + return ys[j] < ys[i] +} + +// Return true if the specified date is inside the series +func (ys Years) Contains(year int) (result bool) { + result = false + for _, yss := range ys { + if yss == year { + result = true + break + } + } + return +} + +// Parse MonthDay elements from string separated by sep. +func (ys *Years) Parse(input, sep string) { + switch input { + case "*all", "*none": + *ys = []int{} + default: + elements := strings.Split(input, sep) + for _, yss := range elements { + if year, err := strconv.Atoi(yss); err == nil { + *ys = append(*ys, year) + } + } + } +} + +func (yss Years) store() (result string) { + for _, ys := range yss { + result += strconv.Itoa(int(ys)) + "," + } + result = strings.TrimRight(result, ",") + return +} + +func (yss *Years) restore(input string) { + for _, ys := range strings.Split(input, ",") { + if ys != "" { + mm, _ := strconv.Atoi(ys) + *yss = append(*yss, mm) + } + } +} + // Defines months series type Months []time.Month diff --git a/timespans/interval.go b/timespans/interval.go index d0266e099..c93725253 100644 --- a/timespans/interval.go +++ b/timespans/interval.go @@ -31,6 +31,7 @@ import ( Defines a time interval for which a certain set of prices will apply */ type Interval struct { + Years Years Months Months MonthDays MonthDays WeekDays WeekDays @@ -42,6 +43,10 @@ type Interval struct { Returns true if the received time result inside the interval */ func (i *Interval) Contains(t time.Time) bool { + // check for years + if len(i.Years) > 0 && !i.Years.Contains(t.Year()) { + return false + } // check for months if len(i.Months) > 0 && !i.Months.Contains(t.Month()) { return false @@ -116,11 +121,12 @@ func (i *Interval) getLeftMargin(t time.Time) (rigthtTime time.Time) { } func (i *Interval) String() string { - return fmt.Sprintf("%v %v %v %v %v", i.Months, i.MonthDays, i.WeekDays, i.StartTime, i.EndTime) + return fmt.Sprintf("%v %v %v %v %v %v", i.Years, i.Months, i.MonthDays, i.WeekDays, i.StartTime, i.EndTime) } func (i *Interval) Equal(o *Interval) bool { - return reflect.DeepEqual(i.Months, o.Months) && + return reflect.DeepEqual(i.Years, o.Years) && + reflect.DeepEqual(i.Months, o.Months) && reflect.DeepEqual(i.MonthDays, o.MonthDays) && reflect.DeepEqual(i.WeekDays, o.WeekDays) && i.StartTime == o.StartTime && @@ -131,6 +137,7 @@ func (i *Interval) Equal(o *Interval) bool { Serializes the intervals for the storag. Used for key-value storages. */ func (i *Interval) store() (result string) { + result += i.Years.store() + ";" result += i.Months.store() + ";" result += i.MonthDays.store() + ";" result += i.WeekDays.store() + ";" @@ -149,14 +156,15 @@ De-serializes the interval for the storage. Used for key-value storages. */ func (i *Interval) restore(input string) { is := strings.Split(input, ";") - i.Months.restore(is[0]) - i.MonthDays.restore(is[1]) - i.WeekDays.restore(is[2]) - i.StartTime = is[3] - i.EndTime = is[4] - i.Weight, _ = strconv.ParseFloat(is[5], 64) - i.ConnectFee, _ = strconv.ParseFloat(is[6], 64) - i.Price, _ = strconv.ParseFloat(is[7], 64) - i.PricedUnits, _ = strconv.ParseFloat(is[8], 64) - i.RateIncrements, _ = strconv.ParseFloat(is[9], 64) + i.Years.restore(is[0]) + i.Months.restore(is[1]) + i.MonthDays.restore(is[2]) + i.WeekDays.restore(is[3]) + i.StartTime = is[4] + i.EndTime = is[5] + i.Weight, _ = strconv.ParseFloat(is[6], 64) + i.ConnectFee, _ = strconv.ParseFloat(is[7], 64) + i.Price, _ = strconv.ParseFloat(is[8], 64) + i.PricedUnits, _ = strconv.ParseFloat(is[9], 64) + i.RateIncrements, _ = strconv.ParseFloat(is[10], 64) } diff --git a/timespans/responder.go b/timespans/responder.go index fee16e9f7..d3fcacfbd 100644 --- a/timespans/responder.go +++ b/timespans/responder.go @@ -23,12 +23,13 @@ import ( "fmt" "github.com/cgrates/cgrates/balancer" "log" + "reflect" "runtime" + "strings" "time" ) type Responder struct { - Rpc bool Bal *balancer.Balancer ExitChan chan bool } @@ -36,12 +37,15 @@ type Responder struct { /* RPC method thet provides the external RPC interface for getting the rating information. */ -func (r *Responder) GetCost(arg CallDescriptor, reply *CallCost) (err error) { - if r.Rpc { - r, e := r.getCallCost(&arg, "Responder.GetCost") +func (rs *Responder) GetCost(arg CallDescriptor, reply *CallCost) (err error) { + log.Print("Ready: ", rs.Bal) + if rs.Bal != nil { + r, e := rs.getCallCost(&arg, "Responder.GetCost") *reply, err = *r, e } else { + log.Print("1", arg.GetUserBalanceKey()) r, e := AccLock.GuardGetCost(arg.GetUserBalanceKey(), func() (*CallCost, error) { + log.Print("here", arg) return (&arg).GetCost() }) *reply, err = *r, e @@ -49,9 +53,9 @@ func (r *Responder) GetCost(arg CallDescriptor, reply *CallCost) (err error) { return } -func (r *Responder) Debit(arg CallDescriptor, reply *CallCost) (err error) { - if r.Rpc { - r, e := r.getCallCost(&arg, "Responder.Debit") +func (rs *Responder) Debit(arg CallDescriptor, reply *CallCost) (err error) { + if rs.Bal != nil { + r, e := rs.getCallCost(&arg, "Responder.Debit") *reply, err = *r, e } else { r, e := AccLock.GuardGetCost(arg.GetUserBalanceKey(), func() (*CallCost, error) { @@ -62,9 +66,9 @@ func (r *Responder) Debit(arg CallDescriptor, reply *CallCost) (err error) { return } -func (r *Responder) DebitCents(arg CallDescriptor, reply *float64) (err error) { - if r.Rpc { - *reply, err = r.callMethod(&arg, "Responder.DebitCents") +func (rs *Responder) DebitCents(arg CallDescriptor, reply *float64) (err error) { + if rs.Bal != nil { + *reply, err = rs.callMethod(&arg, "Responder.DebitCents") } else { r, e := AccLock.Guard(arg.GetUserBalanceKey(), func() (float64, error) { return (&arg).DebitCents() @@ -74,9 +78,9 @@ func (r *Responder) DebitCents(arg CallDescriptor, reply *float64) (err error) { return } -func (r *Responder) DebitSMS(arg CallDescriptor, reply *float64) (err error) { - if r.Rpc { - *reply, err = r.callMethod(&arg, "Responder.DebitSMS") +func (rs *Responder) DebitSMS(arg CallDescriptor, reply *float64) (err error) { + if rs.Bal != nil { + *reply, err = rs.callMethod(&arg, "Responder.DebitSMS") } else { r, e := AccLock.Guard(arg.GetUserBalanceKey(), func() (float64, error) { return (&arg).DebitSMS() @@ -86,9 +90,9 @@ func (r *Responder) DebitSMS(arg CallDescriptor, reply *float64) (err error) { return } -func (r *Responder) DebitSeconds(arg CallDescriptor, reply *float64) (err error) { - if r.Rpc { - *reply, err = r.callMethod(&arg, "Responder.DebitSeconds") +func (rs *Responder) DebitSeconds(arg CallDescriptor, reply *float64) (err error) { + if rs.Bal != nil { + *reply, err = rs.callMethod(&arg, "Responder.DebitSeconds") } else { r, e := AccLock.Guard(arg.GetUserBalanceKey(), func() (float64, error) { return 0, (&arg).DebitSeconds() @@ -98,9 +102,9 @@ func (r *Responder) DebitSeconds(arg CallDescriptor, reply *float64) (err error) return } -func (r *Responder) GetMaxSessionTime(arg CallDescriptor, reply *float64) (err error) { - if r.Rpc { - *reply, err = r.callMethod(&arg, "Responder.GetMaxSessionTime") +func (rs *Responder) GetMaxSessionTime(arg CallDescriptor, reply *float64) (err error) { + if rs.Bal != nil { + *reply, err = rs.callMethod(&arg, "Responder.GetMaxSessionTime") } else { r, e := AccLock.Guard(arg.GetUserBalanceKey(), func() (float64, error) { return (&arg).GetMaxSessionTime() @@ -110,9 +114,9 @@ func (r *Responder) GetMaxSessionTime(arg CallDescriptor, reply *float64) (err e return } -func (r *Responder) AddRecievedCallSeconds(arg CallDescriptor, reply *float64) (err error) { - if r.Rpc { - *reply, err = r.callMethod(&arg, "Responder.AddRecievedCallSeconds") +func (rs *Responder) AddRecievedCallSeconds(arg CallDescriptor, reply *float64) (err error) { + if rs.Bal != nil { + *reply, err = rs.callMethod(&arg, "Responder.AddRecievedCallSeconds") } else { r, e := AccLock.Guard(arg.GetUserBalanceKey(), func() (float64, error) { @@ -123,13 +127,13 @@ func (r *Responder) AddRecievedCallSeconds(arg CallDescriptor, reply *float64) ( return } -func (r *Responder) Status(arg CallDescriptor, reply *string) (err error) { +func (rs *Responder) Status(arg CallDescriptor, reply *string) (err error) { memstats := new(runtime.MemStats) runtime.ReadMemStats(memstats) - if r.Rpc { + if rs.Bal != nil { *reply = "Connected raters:\n" - for _, rater := range r.Bal.GetClientAddresses() { + for _, rater := range rs.Bal.GetClientAddresses() { log.Print(rater) *reply += fmt.Sprintf("%v\n", rater) } @@ -140,10 +144,12 @@ func (r *Responder) Status(arg CallDescriptor, reply *string) (err error) { return } -func (r *Responder) Shutdown(arg string, reply *string) (err error) { - r.Bal.Shutdown() +func (rs *Responder) Shutdown(arg string, reply *string) (err error) { + if rs.Bal != nil { + rs.Bal.Shutdown() + } storageGetter.Close() - defer func() { r.ExitChan <- true }() + defer func() { rs.ExitChan <- true }() *reply = "Done!" return } @@ -151,10 +157,10 @@ func (r *Responder) Shutdown(arg string, reply *string) (err error) { /* The function that gets the information from the raters using balancer. */ -func (r *Responder) getCallCost(key *CallDescriptor, method string) (reply *CallCost, err error) { +func (rs *Responder) getCallCost(key *CallDescriptor, method string) (reply *CallCost, err error) { err = errors.New("") //not nil value for err != nil { - client := r.Bal.Balance() + client := rs.Bal.Balance() if client == nil { log.Print("Waiting for raters to register...") time.Sleep(1 * time.Second) // wait one second and retry @@ -175,10 +181,10 @@ func (r *Responder) getCallCost(key *CallDescriptor, method string) (reply *Call /* The function that gets the information from the raters using balancer. */ -func (r *Responder) callMethod(key *CallDescriptor, method string) (reply float64, err error) { +func (rs *Responder) callMethod(key *CallDescriptor, method string) (reply float64, err error) { err = errors.New("") //not nil value for err != nil { - client := r.Bal.Balance() + client := rs.Bal.Balance() if client == nil { log.Print("Waiting for raters to register...") time.Sleep(1 * time.Second) // wait one second and retry @@ -194,3 +200,25 @@ func (r *Responder) callMethod(key *CallDescriptor, method string) (reply float6 } return } + +type ResponderWorker struct { + R *Responder +} + +func (rw *ResponderWorker) Call(serviceMethod string, args interface{}, reply interface{}) error { + log.Printf("Calling by reflection: %v %v", strings.TrimLeft(serviceMethod, "Responder."), reply) + + method := reflect.ValueOf(rw.R).MethodByName(strings.TrimLeft(serviceMethod, "Responder.")) + r := method.Call([]reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}) + + response := r[0].Interface() + log.Printf("Called: %v %v", reply, response) + if response == nil { + return nil + } + return response.(error) +} + +func (rw *ResponderWorker) Close() error { + return nil +}