implemented new locking mechanism

This commit is contained in:
Radu Ioan Fericean
2012-07-09 19:30:45 +03:00
parent d377d19a63
commit ac510fd200
13 changed files with 141 additions and 152 deletions

View File

@@ -27,27 +27,24 @@ import (
"log"
"runtime"
"time"
"sync"
)
var (
raterAddress = flag.String("rateraddr", "127.0.0.1:2000", "Rater server address (localhost:2000)")
jsonRpcAddress = flag.String("jsonrpcaddr", "127.0.0.1:2001", "Json RPC server address (localhost:2001)")
httpApiAddress = flag.String("httpapiaddr", "127.0.0.1:8000", "Http API server address (localhost:2002)")
freeswitchsrv = flag.String("freeswitchsrv", "localhost:8021", "freeswitch address host:port")
freeswitchpass = flag.String("freeswitchpass", "ClueCon", "freeswitch address host:port")
js = flag.Bool("json", false, "use JSON for RPC encoding")
bal *balancer.Balancer
balancerRWMutex sync.RWMutex
raterAddress = flag.String("rateraddr", "127.0.0.1:2000", "Rater server address (localhost:2000)")
jsonRpcAddress = flag.String("jsonrpcaddr", "127.0.0.1:2001", "Json RPC server address (localhost:2001)")
httpApiAddress = flag.String("httpapiaddr", "127.0.0.1:8000", "Http API server address (localhost:2002)")
freeswitchsrv = flag.String("freeswitchsrv", "localhost:8021", "freeswitch address host:port")
freeswitchpass = flag.String("freeswitchpass", "ClueCon", "freeswitch address host:port")
js = flag.Bool("json", false, "use JSON for RPC encoding")
bal *balancer.Balancer
accLock = timespans.NewAccountLock()
)
/*
The function that gets the information from the raters using balancer.
*/
func GetCallCost(key *timespans.CallDescriptor, method string) (reply *timespans.CallCost) {
// balancerRWMutex.RLock()
// defer balancerRWMutex.RUnlock()
err := errors.New("") //not nil value
func GetCallCost(key *timespans.CallDescriptor, method string) (reply *timespans.CallCost, err error) {
err = errors.New("") //not nil value
for err != nil {
client := bal.Balance()
if client == nil {
@@ -55,7 +52,10 @@ func GetCallCost(key *timespans.CallDescriptor, method string) (reply *timespans
time.Sleep(1 * time.Second) // wait one second and retry
} else {
reply = &timespans.CallCost{}
err = client.Call(method, *key, reply)
reply, err = timespans.AccLock.GuardGetCost(key.GetKey(), func() (*timespans.CallCost, error) {
err = client.Call(method, *key, reply)
return reply, err
})
if err != nil {
log.Printf("Got en error from rater: %v", err)
}
@@ -67,17 +67,20 @@ func GetCallCost(key *timespans.CallDescriptor, method string) (reply *timespans
/*
The function that gets the information from the raters using balancer.
*/
func CallMethod(key *timespans.CallDescriptor, method string) (reply float64) {
func CallMethod(key *timespans.CallDescriptor, method string) (reply float64, err error) {
// balancerRWMutex.Lock()
// defer balancerRWMutex.Unlock()
err := errors.New("") //not nil value
err = errors.New("") //not nil value
for err != nil {
client := bal.Balance()
if client == nil {
log.Print("Waiting for raters to register...")
time.Sleep(1 * time.Second) // wait one second and retry
} else {
err = client.Call(method, *key, &reply)
reply, err = timespans.AccLock.Guard(key.GetKey(), func() (float64, error) {
err = client.Call(method, *key, &reply)
return reply, err
})
if err != nil {
log.Printf("Got en error from rater: %v", err)
}

View File

@@ -45,7 +45,7 @@ func getCostHandler(w http.ResponseWriter, r *http.Request) {
return
}
arg := &timespans.CallDescriptor{Direction: direction[0], Tenant: tenant[0], TOR: tor[0], Subject: subj[0], Destination: dest[0]}
callCost := GetCallCost(arg, "Responder.GetCost")
callCost, _ := GetCallCost(arg, "Responder.GetCost")
enc.Encode(callCost)
}
@@ -67,7 +67,7 @@ func debitBalanceHandler(w http.ResponseWriter, r *http.Request) {
return
}
arg := &timespans.CallDescriptor{Direction: direction[0], Tenant: tenant[0], TOR: tor[0], Subject: subj[0], Destination: dest[0], Amount: amount}
result := CallMethod(arg, "Responder.DebitCents")
result, _ := CallMethod(arg, "Responder.DebitCents")
enc.Encode(result)
}
@@ -89,7 +89,7 @@ func debitSMSHandler(w http.ResponseWriter, r *http.Request) {
return
}
arg := &timespans.CallDescriptor{Direction: direction[0], Tenant: tenant[0], TOR: tor[0], Subject: subj[0], Destination: dest[0], Amount: amount}
result := CallMethod(arg, "Responder.DebitSMS")
result, _ := CallMethod(arg, "Responder.DebitSMS")
enc.Encode(result)
}
@@ -111,7 +111,7 @@ func debitSecondsHandler(w http.ResponseWriter, r *http.Request) {
return
}
arg := &timespans.CallDescriptor{Direction: direction[0], Tenant: tenant[0], TOR: tor[0], Subject: subj[0], Destination: dest[0], Amount: amount}
result := CallMethod(arg, "Responder.DebitSeconds")
result, _ := CallMethod(arg, "Responder.DebitSeconds")
enc.Encode(result)
}
@@ -133,7 +133,7 @@ func getMaxSessionTimeHandler(w http.ResponseWriter, r *http.Request) {
return
}
arg := &timespans.CallDescriptor{Direction: direction[0], Tenant: tenant[0], TOR: tor[0], Subject: subj[0], Destination: dest[0], Amount: amount}
result := CallMethod(arg, "Responder.GetMaxSessionTime")
result, _ := CallMethod(arg, "Responder.GetMaxSessionTime")
enc.Encode(result)
}
@@ -193,7 +193,7 @@ func addRecievedCallSeconds(w http.ResponseWriter, r *http.Request) {
return
}
arg := &timespans.CallDescriptor{Direction: direction[0], Tenant: tenant[0], TOR: tor[0], Subject: subj[0], Destination: dest[0], Amount: amount}
result := CallMethod(arg, "Responder.AddRecievedCallSeconds")
result, _ := CallMethod(arg, "Responder.AddRecievedCallSeconds")
enc.Encode(result)
}

View File

@@ -33,37 +33,39 @@ type Responder byte
RPC method thet provides the external RPC interface for getting the rating information.
*/
func (r *Responder) GetCost(arg timespans.CallDescriptor, replay *timespans.CallCost) (err error) {
*replay = *GetCallCost(&arg, "Responder.GetCost")
rs, err := GetCallCost(&arg, "Responder.GetCost")
*replay = *rs
return
}
func (r *Responder) Debit(arg timespans.CallDescriptor, replay *timespans.CallCost) (err error) {
*replay = *GetCallCost(&arg, "Responder.Debit")
rs, err := GetCallCost(&arg, "Responder.Debit")
*replay = *rs
return
}
func (r *Responder) DebitBalance(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay = CallMethod(&arg, "Responder.DebitCents")
*replay, err = CallMethod(&arg, "Responder.DebitCents")
return
}
func (r *Responder) DebitSMS(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay = CallMethod(&arg, "Responder.DebitSMS")
*replay, err = CallMethod(&arg, "Responder.DebitSMS")
return
}
func (r *Responder) DebitSeconds(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay = CallMethod(&arg, "Responder.DebitSeconds")
*replay, err = CallMethod(&arg, "Responder.DebitSeconds")
return
}
func (r *Responder) GetMaxSessionTime(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay = CallMethod(&arg, "Responder.GetMaxSessionTime")
*replay, err = CallMethod(&arg, "Responder.GetMaxSessionTime")
return
}
func (r *Responder) AddVolumeDiscountSeconds(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay = CallMethod(&arg, "Responder.AddVolumeDiscountSeconds")
*replay, err = CallMethod(&arg, "Responder.AddVolumeDiscountSeconds")
return
}

View File

@@ -56,38 +56,50 @@ func NewStorage(nsg timespans.StorageGetter) *Responder {
RPC method providing the rating information from the storage.
*/
func (s *Responder) GetCost(cd timespans.CallDescriptor, reply *timespans.CallCost) (err error) {
r, e := (&cd).GetCost()
r, e := timespans.AccLock.GuardGetCost(cd.GetKey(), func() (*timespans.CallCost, error) {
return (&cd).GetCost()
})
*reply, err = *r, e
return err
}
func (s *Responder) DebitCents(cd timespans.CallDescriptor, reply *float64) (err error) {
r, e := (&cd).DebitCents()
r, e := timespans.AccLock.Guard(cd.GetKey(), func() (float64, error) {
return (&cd).DebitCents()
})
*reply, err = r, e
return err
}
func (s *Responder) DebitSMS(cd timespans.CallDescriptor, reply *float64) (err error) {
r, e := (&cd).DebitSMS()
r, e := timespans.AccLock.Guard(cd.GetKey(), func() (float64, error) {
return (&cd).DebitSMS()
})
*reply, err = r, e
return err
}
func (s *Responder) DebitSeconds(cd timespans.CallDescriptor, reply *float64) (err error) {
e := (&cd).DebitSeconds()
*reply, err = 0.0, e
r, e := timespans.AccLock.Guard(cd.GetKey(), func() (float64, error) {
return 0, (&cd).DebitSeconds()
})
*reply, err = r, e
return err
}
func (s *Responder) GetMaxSessionTime(cd timespans.CallDescriptor, reply *float64) (err error) {
r, e := (&cd).GetMaxSessionTime()
r, e := timespans.AccLock.Guard(cd.GetKey(), func() (float64, error) {
return (&cd).GetMaxSessionTime()
})
*reply, err = r, e
return err
}
func (s *Responder) AddRecievedCallSeconds(cd timespans.CallDescriptor, reply *float64) (err error) {
e := (&cd).AddRecievedCallSeconds()
*reply, err = 0, e
r, e := timespans.AccLock.Guard(cd.GetKey(), func() (float64, error) {
return 0, (&cd).AddRecievedCallSeconds()
})
*reply, err = r, e
return err
}

57
timespans/accountlock.go Normal file
View File

@@ -0,0 +1,57 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2012 Radu Ioan Fericean
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package timespans
var AccLock *AccountLock
func init() {
AccLock = NewAccountLock()
}
type AccountLock struct {
queue map[string]chan bool
}
func NewAccountLock() *AccountLock {
return &AccountLock{queue: make(map[string]chan bool)}
}
func (cm *AccountLock) GuardGetCost(name string, handler func() (*CallCost, error)) (reply *CallCost, err error) {
lock, exists := AccLock.queue[name]
if !exists {
lock = make(chan bool, 1)
AccLock.queue[name] = lock
}
lock <- true
reply, err = handler()
<-lock
return
}
func (cm *AccountLock) Guard(name string, handler func() (float64, error)) (reply float64, err error) {
lock, exists := AccLock.queue[name]
if !exists {
lock = make(chan bool, 1)
AccLock.queue[name] = lock
}
lock <- true
reply, err = handler()
<-lock
return
}

View File

@@ -150,8 +150,7 @@ func (at *ActionTiming) getUserBalances() (ubs []*UserBalance) {
}
func (at *ActionTiming) Execute() (err error) {
userBalancesRWMutex.Lock()
defer userBalancesRWMutex.Unlock()
// TODO: add sync mutex here
aac, err := at.getActions()
if err != nil {
log.Print("Failed to get actions: ", err)

View File

@@ -175,8 +175,6 @@ func (cd *CallDescriptor) splitInTimeSpans() (timespans []*TimeSpan) {
Splits the received timespan into sub time spans according to the activation periods intervals.
*/
func (cd *CallDescriptor) splitTimeSpan(firstSpan *TimeSpan) (timespans []*TimeSpan) {
userBalancesRWMutex.RLock()
defer userBalancesRWMutex.RUnlock()
timespans = append(timespans, firstSpan)
// split on (free) minute buckets
if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil {
@@ -272,8 +270,6 @@ and will decrease it by 10% for nine times. So if the user has little credit it
If the user has no credit then it will return 0.
*/
func (cd *CallDescriptor) GetMaxSessionTime() (seconds float64, err error) {
userBalancesRWMutex.RLock()
defer userBalancesRWMutex.RUnlock()
_, err = cd.SearchStorageForPrefix()
now := time.Now()
availableCredit, availableSeconds := 0.0, 0.0
@@ -317,8 +313,6 @@ func (cd *CallDescriptor) GetMaxSessionTime() (seconds float64, err error) {
// Interface method used to add/substract an amount of cents or bonus seconds (as returned by GetCost method)
// from user's money balance.
func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
userBalancesRWMutex.Lock()
defer userBalancesRWMutex.Unlock()
cc, err = cd.GetCost()
if err != nil {
log.Printf("error getting cost %v", err)
@@ -342,8 +336,6 @@ Interface method used to add/substract an amount of cents from user's money bala
The amount filed has to be filled in call descriptor.
*/
func (cd *CallDescriptor) DebitCents() (left float64, err error) {
userBalancesRWMutex.Lock()
defer userBalancesRWMutex.Unlock()
if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil {
defer storageGetter.SetUserBalance(userBalance)
return userBalance.debitBalance(CREDIT, cd.Amount, true), nil
@@ -356,8 +348,6 @@ Interface method used to add/substract an amount of units from user's sms balanc
The amount filed has to be filled in call descriptor.
*/
func (cd *CallDescriptor) DebitSMS() (left float64, err error) {
userBalancesRWMutex.Lock()
defer userBalancesRWMutex.Unlock()
if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil {
defer storageGetter.SetUserBalance(userBalance)
return userBalance.debitBalance(SMS, cd.Amount, true), nil
@@ -370,8 +360,6 @@ Interface method used to add/substract an amount of seconds from user's minutes
The amount filed has to be filled in call descriptor.
*/
func (cd *CallDescriptor) DebitSeconds() (err error) {
userBalancesRWMutex.Lock()
defer userBalancesRWMutex.Unlock()
if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil {
defer storageGetter.SetUserBalance(userBalance)
return userBalance.debitMinutesBalance(cd.Amount, cd.Destination, true)
@@ -386,8 +374,6 @@ specified in the tariff plan is applied.
The amount filed has to be filled in call descriptor.
*/
func (cd *CallDescriptor) AddRecievedCallSeconds() (err error) {
userBalancesRWMutex.Lock()
defer userBalancesRWMutex.Unlock()
if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil {
a := &Action{
MinuteBucket: &MinuteBucket{Seconds: cd.Amount, DestinationId: cd.Destination},

View File

@@ -36,6 +36,7 @@ var (
DestinationCacheMap = make(destinationCacheMap)
)
// Gets the specified destination from the storage and caches it.
func GetDestination(dId string) (d *Destination, err error) {
d, exists := DestinationCacheMap[dId]
if !exists {

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package timespans
import (
"reflect"
"testing"
)
@@ -63,3 +64,11 @@ func TestMinutBucketEqual(t *testing.T) {
t.Error("Equal failure!", mb1, mb2, mb3)
}
}
func TestMinutBucketClone(t *testing.T) {
mb1 := &MinuteBucket{Seconds: 1, Weight: 2, Price: 3, Percent: 4, DestinationId: "5"}
mb2 := mb1.Clone()
if mb1 == mb2 || !reflect.DeepEqual(mb1, mb2) {
t.Error("Cloning failure: ", mb1, mb2)
}
}

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package timespans
import (
"log"
"strconv"
"strings"
)
@@ -28,7 +29,7 @@ type UnitsCounter struct {
Direction string
BalanceId string
Units float64
MinuteBuckets []*MinuteBucket
MinuteBuckets bucketsorter
}
func (uc *UnitsCounter) initMinuteBuckets(ats []*ActionTrigger) {
@@ -43,17 +44,20 @@ func (uc *UnitsCounter) initMinuteBuckets(ats []*ActionTrigger) {
}
}
}
uc.MinuteBuckets.Sort()
}
// Adds the minutes from the received minute bucket to an existing bucket if the destination
// is the same or ads the minutye bucket to the list if none matches.
func (uc *UnitsCounter) addMinutes(newMb *MinuteBucket) {
if newMb == nil {
return
}
func (uc *UnitsCounter) addMinutes(amount float64, prefix string) {
for _, mb := range uc.MinuteBuckets {
if mb.Equal(newMb) {
mb.Seconds += newMb.Seconds
d, err := GetDestination(mb.DestinationId)
if err != nil {
log.Print("Minutes counter: unknown destination: ", mb.DestinationId)
continue
}
if ok, _ := d.containsPrefix(prefix); ok {
mb.Seconds += amount
break
}
}

View File

@@ -48,8 +48,7 @@ func TestUnitsCounterAddMinuteBucket(t *testing.T) {
Units: 100,
MinuteBuckets: []*MinuteBucket{&MinuteBucket{Weight: 20, Price: 1, DestinationId: "NAT"}, &MinuteBucket{Weight: 10, Price: 10, Percent: 0, DestinationId: "RET"}},
}
newMb := &MinuteBucket{Weight: 20, Price: 1, DestinationId: "NEW"}
uc.addMinutes(newMb)
uc.addMinutes(20, "test")
if len(uc.MinuteBuckets) != 2 {
t.Error("Error adding minute bucket!")
}
@@ -62,22 +61,8 @@ func TestUnitsCounterAddMinuteBucketExists(t *testing.T) {
Units: 100,
MinuteBuckets: []*MinuteBucket{&MinuteBucket{Seconds: 10, Weight: 20, Price: 1, DestinationId: "NAT"}, &MinuteBucket{Weight: 10, Price: 10, Percent: 0, DestinationId: "RET"}},
}
newMb := &MinuteBucket{Seconds: 5, Weight: 20, Price: 1, DestinationId: "NAT"}
uc.addMinutes(newMb)
uc.addMinutes(5, "0723")
if len(uc.MinuteBuckets) != 2 || uc.MinuteBuckets[0].Seconds != 15 {
t.Error("Error adding minute bucket!")
}
}
func TestUnitsCounterAddMinuteBucketNil(t *testing.T) {
uc := &UnitsCounter{
Direction: OUTBOUND,
BalanceId: SMS,
Units: 100,
MinuteBuckets: []*MinuteBucket{&MinuteBucket{Seconds: 10, Weight: 20, Price: 1, DestinationId: "NAT"}, &MinuteBucket{Weight: 10, Price: 10, Percent: 0, DestinationId: "RET"}},
}
uc.addMinutes(nil)
if len(uc.MinuteBuckets) != 2 {
t.Error("Error adding minute bucket!")
}
}

View File

@@ -22,7 +22,6 @@ import (
"errors"
"strconv"
"strings"
"sync"
)
const (
@@ -43,8 +42,7 @@ const (
)
var (
storageGetter StorageGetter
userBalancesRWMutex sync.RWMutex
storageGetter StorageGetter
)
/*
@@ -237,13 +235,11 @@ func (ub *UserBalance) countUnits(a *Action) {
unitsCounter = &UnitsCounter{BalanceId: a.BalanceId}
ub.UnitCounters = append(ub.UnitCounters, unitsCounter)
}
if unitsCounter.BalanceId == MINUTES && a.MinuteBucket != nil {
unitsCounter.addMinutes(a.MinuteBucket)
goto TRIGGERS
if a.BalanceId == MINUTES && a.MinuteBucket != nil {
unitsCounter.addMinutes(a.MinuteBucket.Seconds, a.MinuteBucket.DestinationId)
} else {
unitsCounter.Units += a.Units
}
unitsCounter.Units += a.Units
TRIGGERS:
ub.executeActionTriggers()
}

View File

@@ -1,65 +0,0 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2012 Radu Ioan Fericean
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package main
import (
"log"
"time"
)
var (
cm = NewChanMutex()
)
type Command struct {
name string
data string
}
func (c *Command) Execute() (err error) {
ch := cm.pipe[c.name]
ch <- c
log.Print(c.data)
time.Sleep(1 * time.Second)
log.Print("end ", c.data)
<-ch
return
}
type ChanMutex struct {
pipe map[string]chan *Command
}
func NewChanMutex() *ChanMutex {
return &ChanMutex{pipe: make(map[string]chan *Command)}
}
func (cm *ChanMutex) Execute(c *Command) {
if _, exists := cm.pipe[c.name]; !exists {
cm.pipe[c.name] = make(chan *Command, 1)
}
c.Execute()
}
func main() {
go cm.Execute(&Command{"vdf:rif", "prima rif"})
go cm.Execute(&Command{"vdf:dan", "prima Dan"})
go cm.Execute(&Command{"vdf:rif", "a doua rif"})
time.Sleep(5 * time.Second)
}