diff --git a/cmd/cgr-balancer/cgr-balanncer.go b/cmd/cgr-balancer/cgr-balanncer.go index 8f2f7a770..e34e216d4 100644 --- a/cmd/cgr-balancer/cgr-balanncer.go +++ b/cmd/cgr-balancer/cgr-balanncer.go @@ -27,27 +27,24 @@ import ( "log" "runtime" "time" - "sync" ) var ( - raterAddress = flag.String("rateraddr", "127.0.0.1:2000", "Rater server address (localhost:2000)") - jsonRpcAddress = flag.String("jsonrpcaddr", "127.0.0.1:2001", "Json RPC server address (localhost:2001)") - httpApiAddress = flag.String("httpapiaddr", "127.0.0.1:8000", "Http API server address (localhost:2002)") - freeswitchsrv = flag.String("freeswitchsrv", "localhost:8021", "freeswitch address host:port") - freeswitchpass = flag.String("freeswitchpass", "ClueCon", "freeswitch address host:port") - js = flag.Bool("json", false, "use JSON for RPC encoding") - bal *balancer.Balancer - balancerRWMutex sync.RWMutex + raterAddress = flag.String("rateraddr", "127.0.0.1:2000", "Rater server address (localhost:2000)") + jsonRpcAddress = flag.String("jsonrpcaddr", "127.0.0.1:2001", "Json RPC server address (localhost:2001)") + httpApiAddress = flag.String("httpapiaddr", "127.0.0.1:8000", "Http API server address (localhost:2002)") + freeswitchsrv = flag.String("freeswitchsrv", "localhost:8021", "freeswitch address host:port") + freeswitchpass = flag.String("freeswitchpass", "ClueCon", "freeswitch address host:port") + js = flag.Bool("json", false, "use JSON for RPC encoding") + bal *balancer.Balancer + accLock = timespans.NewAccountLock() ) /* The function that gets the information from the raters using balancer. */ -func GetCallCost(key *timespans.CallDescriptor, method string) (reply *timespans.CallCost) { - // balancerRWMutex.RLock() - // defer balancerRWMutex.RUnlock() - err := errors.New("") //not nil value +func GetCallCost(key *timespans.CallDescriptor, method string) (reply *timespans.CallCost, err error) { + err = errors.New("") //not nil value for err != nil { client := bal.Balance() if client == nil { @@ -55,7 +52,10 @@ func GetCallCost(key *timespans.CallDescriptor, method string) (reply *timespans time.Sleep(1 * time.Second) // wait one second and retry } else { reply = ×pans.CallCost{} - err = client.Call(method, *key, reply) + reply, err = timespans.AccLock.GuardGetCost(key.GetKey(), func() (*timespans.CallCost, error) { + err = client.Call(method, *key, reply) + return reply, err + }) if err != nil { log.Printf("Got en error from rater: %v", err) } @@ -67,17 +67,20 @@ func GetCallCost(key *timespans.CallDescriptor, method string) (reply *timespans /* The function that gets the information from the raters using balancer. */ -func CallMethod(key *timespans.CallDescriptor, method string) (reply float64) { +func CallMethod(key *timespans.CallDescriptor, method string) (reply float64, err error) { // balancerRWMutex.Lock() // defer balancerRWMutex.Unlock() - err := errors.New("") //not nil value + err = errors.New("") //not nil value for err != nil { client := bal.Balance() if client == nil { log.Print("Waiting for raters to register...") time.Sleep(1 * time.Second) // wait one second and retry } else { - err = client.Call(method, *key, &reply) + reply, err = timespans.AccLock.Guard(key.GetKey(), func() (float64, error) { + err = client.Call(method, *key, &reply) + return reply, err + }) if err != nil { log.Printf("Got en error from rater: %v", err) } diff --git a/cmd/cgr-balancer/http_responder.go b/cmd/cgr-balancer/http_responder.go index 436397775..058314a9c 100644 --- a/cmd/cgr-balancer/http_responder.go +++ b/cmd/cgr-balancer/http_responder.go @@ -45,7 +45,7 @@ func getCostHandler(w http.ResponseWriter, r *http.Request) { return } arg := ×pans.CallDescriptor{Direction: direction[0], Tenant: tenant[0], TOR: tor[0], Subject: subj[0], Destination: dest[0]} - callCost := GetCallCost(arg, "Responder.GetCost") + callCost, _ := GetCallCost(arg, "Responder.GetCost") enc.Encode(callCost) } @@ -67,7 +67,7 @@ func debitBalanceHandler(w http.ResponseWriter, r *http.Request) { return } arg := ×pans.CallDescriptor{Direction: direction[0], Tenant: tenant[0], TOR: tor[0], Subject: subj[0], Destination: dest[0], Amount: amount} - result := CallMethod(arg, "Responder.DebitCents") + result, _ := CallMethod(arg, "Responder.DebitCents") enc.Encode(result) } @@ -89,7 +89,7 @@ func debitSMSHandler(w http.ResponseWriter, r *http.Request) { return } arg := ×pans.CallDescriptor{Direction: direction[0], Tenant: tenant[0], TOR: tor[0], Subject: subj[0], Destination: dest[0], Amount: amount} - result := CallMethod(arg, "Responder.DebitSMS") + result, _ := CallMethod(arg, "Responder.DebitSMS") enc.Encode(result) } @@ -111,7 +111,7 @@ func debitSecondsHandler(w http.ResponseWriter, r *http.Request) { return } arg := ×pans.CallDescriptor{Direction: direction[0], Tenant: tenant[0], TOR: tor[0], Subject: subj[0], Destination: dest[0], Amount: amount} - result := CallMethod(arg, "Responder.DebitSeconds") + result, _ := CallMethod(arg, "Responder.DebitSeconds") enc.Encode(result) } @@ -133,7 +133,7 @@ func getMaxSessionTimeHandler(w http.ResponseWriter, r *http.Request) { return } arg := ×pans.CallDescriptor{Direction: direction[0], Tenant: tenant[0], TOR: tor[0], Subject: subj[0], Destination: dest[0], Amount: amount} - result := CallMethod(arg, "Responder.GetMaxSessionTime") + result, _ := CallMethod(arg, "Responder.GetMaxSessionTime") enc.Encode(result) } @@ -193,7 +193,7 @@ func addRecievedCallSeconds(w http.ResponseWriter, r *http.Request) { return } arg := ×pans.CallDescriptor{Direction: direction[0], Tenant: tenant[0], TOR: tor[0], Subject: subj[0], Destination: dest[0], Amount: amount} - result := CallMethod(arg, "Responder.AddRecievedCallSeconds") + result, _ := CallMethod(arg, "Responder.AddRecievedCallSeconds") enc.Encode(result) } diff --git a/cmd/cgr-balancer/rpc_responder.go b/cmd/cgr-balancer/rpc_responder.go index e343019fa..e6dcd1cd3 100644 --- a/cmd/cgr-balancer/rpc_responder.go +++ b/cmd/cgr-balancer/rpc_responder.go @@ -33,37 +33,39 @@ type Responder byte RPC method thet provides the external RPC interface for getting the rating information. */ func (r *Responder) GetCost(arg timespans.CallDescriptor, replay *timespans.CallCost) (err error) { - *replay = *GetCallCost(&arg, "Responder.GetCost") + rs, err := GetCallCost(&arg, "Responder.GetCost") + *replay = *rs return } func (r *Responder) Debit(arg timespans.CallDescriptor, replay *timespans.CallCost) (err error) { - *replay = *GetCallCost(&arg, "Responder.Debit") + rs, err := GetCallCost(&arg, "Responder.Debit") + *replay = *rs return } func (r *Responder) DebitBalance(arg timespans.CallDescriptor, replay *float64) (err error) { - *replay = CallMethod(&arg, "Responder.DebitCents") + *replay, err = CallMethod(&arg, "Responder.DebitCents") return } func (r *Responder) DebitSMS(arg timespans.CallDescriptor, replay *float64) (err error) { - *replay = CallMethod(&arg, "Responder.DebitSMS") + *replay, err = CallMethod(&arg, "Responder.DebitSMS") return } func (r *Responder) DebitSeconds(arg timespans.CallDescriptor, replay *float64) (err error) { - *replay = CallMethod(&arg, "Responder.DebitSeconds") + *replay, err = CallMethod(&arg, "Responder.DebitSeconds") return } func (r *Responder) GetMaxSessionTime(arg timespans.CallDescriptor, replay *float64) (err error) { - *replay = CallMethod(&arg, "Responder.GetMaxSessionTime") + *replay, err = CallMethod(&arg, "Responder.GetMaxSessionTime") return } func (r *Responder) AddVolumeDiscountSeconds(arg timespans.CallDescriptor, replay *float64) (err error) { - *replay = CallMethod(&arg, "Responder.AddVolumeDiscountSeconds") + *replay, err = CallMethod(&arg, "Responder.AddVolumeDiscountSeconds") return } diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index 1b989275e..5f7cd4bc8 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -56,38 +56,50 @@ func NewStorage(nsg timespans.StorageGetter) *Responder { RPC method providing the rating information from the storage. */ func (s *Responder) GetCost(cd timespans.CallDescriptor, reply *timespans.CallCost) (err error) { - r, e := (&cd).GetCost() + r, e := timespans.AccLock.GuardGetCost(cd.GetKey(), func() (*timespans.CallCost, error) { + return (&cd).GetCost() + }) *reply, err = *r, e return err } func (s *Responder) DebitCents(cd timespans.CallDescriptor, reply *float64) (err error) { - r, e := (&cd).DebitCents() + r, e := timespans.AccLock.Guard(cd.GetKey(), func() (float64, error) { + return (&cd).DebitCents() + }) *reply, err = r, e return err } func (s *Responder) DebitSMS(cd timespans.CallDescriptor, reply *float64) (err error) { - r, e := (&cd).DebitSMS() + r, e := timespans.AccLock.Guard(cd.GetKey(), func() (float64, error) { + return (&cd).DebitSMS() + }) *reply, err = r, e return err } func (s *Responder) DebitSeconds(cd timespans.CallDescriptor, reply *float64) (err error) { - e := (&cd).DebitSeconds() - *reply, err = 0.0, e + r, e := timespans.AccLock.Guard(cd.GetKey(), func() (float64, error) { + return 0, (&cd).DebitSeconds() + }) + *reply, err = r, e return err } func (s *Responder) GetMaxSessionTime(cd timespans.CallDescriptor, reply *float64) (err error) { - r, e := (&cd).GetMaxSessionTime() + r, e := timespans.AccLock.Guard(cd.GetKey(), func() (float64, error) { + return (&cd).GetMaxSessionTime() + }) *reply, err = r, e return err } func (s *Responder) AddRecievedCallSeconds(cd timespans.CallDescriptor, reply *float64) (err error) { - e := (&cd).AddRecievedCallSeconds() - *reply, err = 0, e + r, e := timespans.AccLock.Guard(cd.GetKey(), func() (float64, error) { + return 0, (&cd).AddRecievedCallSeconds() + }) + *reply, err = r, e return err } diff --git a/timespans/accountlock.go b/timespans/accountlock.go new file mode 100644 index 000000000..76757a000 --- /dev/null +++ b/timespans/accountlock.go @@ -0,0 +1,57 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2012 Radu Ioan Fericean + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package timespans + +var AccLock *AccountLock + +func init() { + AccLock = NewAccountLock() +} + +type AccountLock struct { + queue map[string]chan bool +} + +func NewAccountLock() *AccountLock { + return &AccountLock{queue: make(map[string]chan bool)} +} + +func (cm *AccountLock) GuardGetCost(name string, handler func() (*CallCost, error)) (reply *CallCost, err error) { + lock, exists := AccLock.queue[name] + if !exists { + lock = make(chan bool, 1) + AccLock.queue[name] = lock + } + lock <- true + reply, err = handler() + <-lock + return +} + +func (cm *AccountLock) Guard(name string, handler func() (float64, error)) (reply float64, err error) { + lock, exists := AccLock.queue[name] + if !exists { + lock = make(chan bool, 1) + AccLock.queue[name] = lock + } + lock <- true + reply, err = handler() + <-lock + return +} diff --git a/timespans/action_timing.go b/timespans/action_timing.go index 5f6bc0b12..2708aa6d5 100644 --- a/timespans/action_timing.go +++ b/timespans/action_timing.go @@ -150,8 +150,7 @@ func (at *ActionTiming) getUserBalances() (ubs []*UserBalance) { } func (at *ActionTiming) Execute() (err error) { - userBalancesRWMutex.Lock() - defer userBalancesRWMutex.Unlock() + // TODO: add sync mutex here aac, err := at.getActions() if err != nil { log.Print("Failed to get actions: ", err) diff --git a/timespans/calldesc.go b/timespans/calldesc.go index cdc54c78a..b52d5f5e8 100644 --- a/timespans/calldesc.go +++ b/timespans/calldesc.go @@ -175,8 +175,6 @@ func (cd *CallDescriptor) splitInTimeSpans() (timespans []*TimeSpan) { Splits the received timespan into sub time spans according to the activation periods intervals. */ func (cd *CallDescriptor) splitTimeSpan(firstSpan *TimeSpan) (timespans []*TimeSpan) { - userBalancesRWMutex.RLock() - defer userBalancesRWMutex.RUnlock() timespans = append(timespans, firstSpan) // split on (free) minute buckets if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil { @@ -272,8 +270,6 @@ and will decrease it by 10% for nine times. So if the user has little credit it If the user has no credit then it will return 0. */ func (cd *CallDescriptor) GetMaxSessionTime() (seconds float64, err error) { - userBalancesRWMutex.RLock() - defer userBalancesRWMutex.RUnlock() _, err = cd.SearchStorageForPrefix() now := time.Now() availableCredit, availableSeconds := 0.0, 0.0 @@ -317,8 +313,6 @@ func (cd *CallDescriptor) GetMaxSessionTime() (seconds float64, err error) { // Interface method used to add/substract an amount of cents or bonus seconds (as returned by GetCost method) // from user's money balance. func (cd *CallDescriptor) Debit() (cc *CallCost, err error) { - userBalancesRWMutex.Lock() - defer userBalancesRWMutex.Unlock() cc, err = cd.GetCost() if err != nil { log.Printf("error getting cost %v", err) @@ -342,8 +336,6 @@ Interface method used to add/substract an amount of cents from user's money bala The amount filed has to be filled in call descriptor. */ func (cd *CallDescriptor) DebitCents() (left float64, err error) { - userBalancesRWMutex.Lock() - defer userBalancesRWMutex.Unlock() if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil { defer storageGetter.SetUserBalance(userBalance) return userBalance.debitBalance(CREDIT, cd.Amount, true), nil @@ -356,8 +348,6 @@ Interface method used to add/substract an amount of units from user's sms balanc The amount filed has to be filled in call descriptor. */ func (cd *CallDescriptor) DebitSMS() (left float64, err error) { - userBalancesRWMutex.Lock() - defer userBalancesRWMutex.Unlock() if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil { defer storageGetter.SetUserBalance(userBalance) return userBalance.debitBalance(SMS, cd.Amount, true), nil @@ -370,8 +360,6 @@ Interface method used to add/substract an amount of seconds from user's minutes The amount filed has to be filled in call descriptor. */ func (cd *CallDescriptor) DebitSeconds() (err error) { - userBalancesRWMutex.Lock() - defer userBalancesRWMutex.Unlock() if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil { defer storageGetter.SetUserBalance(userBalance) return userBalance.debitMinutesBalance(cd.Amount, cd.Destination, true) @@ -386,8 +374,6 @@ specified in the tariff plan is applied. The amount filed has to be filled in call descriptor. */ func (cd *CallDescriptor) AddRecievedCallSeconds() (err error) { - userBalancesRWMutex.Lock() - defer userBalancesRWMutex.Unlock() if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil { a := &Action{ MinuteBucket: &MinuteBucket{Seconds: cd.Amount, DestinationId: cd.Destination}, diff --git a/timespans/destinations.go b/timespans/destinations.go index 0e5eeb127..0e1c0b01f 100644 --- a/timespans/destinations.go +++ b/timespans/destinations.go @@ -36,6 +36,7 @@ var ( DestinationCacheMap = make(destinationCacheMap) ) +// Gets the specified destination from the storage and caches it. func GetDestination(dId string) (d *Destination, err error) { d, exists := DestinationCacheMap[dId] if !exists { diff --git a/timespans/minute_buckets_test.go b/timespans/minute_buckets_test.go index f2cfcb468..e649ab375 100644 --- a/timespans/minute_buckets_test.go +++ b/timespans/minute_buckets_test.go @@ -19,6 +19,7 @@ along with this program. If not, see package timespans import ( + "reflect" "testing" ) @@ -63,3 +64,11 @@ func TestMinutBucketEqual(t *testing.T) { t.Error("Equal failure!", mb1, mb2, mb3) } } + +func TestMinutBucketClone(t *testing.T) { + mb1 := &MinuteBucket{Seconds: 1, Weight: 2, Price: 3, Percent: 4, DestinationId: "5"} + mb2 := mb1.Clone() + if mb1 == mb2 || !reflect.DeepEqual(mb1, mb2) { + t.Error("Cloning failure: ", mb1, mb2) + } +} diff --git a/timespans/units_counter.go b/timespans/units_counter.go index 57097457b..a52a53a44 100644 --- a/timespans/units_counter.go +++ b/timespans/units_counter.go @@ -19,6 +19,7 @@ along with this program. If not, see package timespans import ( + "log" "strconv" "strings" ) @@ -28,7 +29,7 @@ type UnitsCounter struct { Direction string BalanceId string Units float64 - MinuteBuckets []*MinuteBucket + MinuteBuckets bucketsorter } func (uc *UnitsCounter) initMinuteBuckets(ats []*ActionTrigger) { @@ -43,17 +44,20 @@ func (uc *UnitsCounter) initMinuteBuckets(ats []*ActionTrigger) { } } } + uc.MinuteBuckets.Sort() } // Adds the minutes from the received minute bucket to an existing bucket if the destination // is the same or ads the minutye bucket to the list if none matches. -func (uc *UnitsCounter) addMinutes(newMb *MinuteBucket) { - if newMb == nil { - return - } +func (uc *UnitsCounter) addMinutes(amount float64, prefix string) { for _, mb := range uc.MinuteBuckets { - if mb.Equal(newMb) { - mb.Seconds += newMb.Seconds + d, err := GetDestination(mb.DestinationId) + if err != nil { + log.Print("Minutes counter: unknown destination: ", mb.DestinationId) + continue + } + if ok, _ := d.containsPrefix(prefix); ok { + mb.Seconds += amount break } } diff --git a/timespans/units_counter_test.go b/timespans/units_counter_test.go index 3ebb4d710..15161e803 100644 --- a/timespans/units_counter_test.go +++ b/timespans/units_counter_test.go @@ -48,8 +48,7 @@ func TestUnitsCounterAddMinuteBucket(t *testing.T) { Units: 100, MinuteBuckets: []*MinuteBucket{&MinuteBucket{Weight: 20, Price: 1, DestinationId: "NAT"}, &MinuteBucket{Weight: 10, Price: 10, Percent: 0, DestinationId: "RET"}}, } - newMb := &MinuteBucket{Weight: 20, Price: 1, DestinationId: "NEW"} - uc.addMinutes(newMb) + uc.addMinutes(20, "test") if len(uc.MinuteBuckets) != 2 { t.Error("Error adding minute bucket!") } @@ -62,22 +61,8 @@ func TestUnitsCounterAddMinuteBucketExists(t *testing.T) { Units: 100, MinuteBuckets: []*MinuteBucket{&MinuteBucket{Seconds: 10, Weight: 20, Price: 1, DestinationId: "NAT"}, &MinuteBucket{Weight: 10, Price: 10, Percent: 0, DestinationId: "RET"}}, } - newMb := &MinuteBucket{Seconds: 5, Weight: 20, Price: 1, DestinationId: "NAT"} - uc.addMinutes(newMb) + uc.addMinutes(5, "0723") if len(uc.MinuteBuckets) != 2 || uc.MinuteBuckets[0].Seconds != 15 { t.Error("Error adding minute bucket!") } } - -func TestUnitsCounterAddMinuteBucketNil(t *testing.T) { - uc := &UnitsCounter{ - Direction: OUTBOUND, - BalanceId: SMS, - Units: 100, - MinuteBuckets: []*MinuteBucket{&MinuteBucket{Seconds: 10, Weight: 20, Price: 1, DestinationId: "NAT"}, &MinuteBucket{Weight: 10, Price: 10, Percent: 0, DestinationId: "RET"}}, - } - uc.addMinutes(nil) - if len(uc.MinuteBuckets) != 2 { - t.Error("Error adding minute bucket!") - } -} diff --git a/timespans/userbalance.go b/timespans/userbalance.go index 649ca4d37..10fb57a10 100644 --- a/timespans/userbalance.go +++ b/timespans/userbalance.go @@ -22,7 +22,6 @@ import ( "errors" "strconv" "strings" - "sync" ) const ( @@ -43,8 +42,7 @@ const ( ) var ( - storageGetter StorageGetter - userBalancesRWMutex sync.RWMutex + storageGetter StorageGetter ) /* @@ -237,13 +235,11 @@ func (ub *UserBalance) countUnits(a *Action) { unitsCounter = &UnitsCounter{BalanceId: a.BalanceId} ub.UnitCounters = append(ub.UnitCounters, unitsCounter) } - - if unitsCounter.BalanceId == MINUTES && a.MinuteBucket != nil { - unitsCounter.addMinutes(a.MinuteBucket) - goto TRIGGERS + if a.BalanceId == MINUTES && a.MinuteBucket != nil { + unitsCounter.addMinutes(a.MinuteBucket.Seconds, a.MinuteBucket.DestinationId) + } else { + unitsCounter.Units += a.Units } - unitsCounter.Units += a.Units -TRIGGERS: ub.executeActionTriggers() } diff --git a/userlock/userlock.go b/userlock/userlock.go deleted file mode 100644 index 624d2a4ed..000000000 --- a/userlock/userlock.go +++ /dev/null @@ -1,65 +0,0 @@ -/* -Rating system designed to be used in VoIP Carriers World -Copyright (C) 2012 Radu Ioan Fericean - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package main - -import ( - "log" - "time" -) - -var ( - cm = NewChanMutex() -) - -type Command struct { - name string - data string -} - -func (c *Command) Execute() (err error) { - ch := cm.pipe[c.name] - ch <- c - log.Print(c.data) - time.Sleep(1 * time.Second) - log.Print("end ", c.data) - <-ch - return -} - -type ChanMutex struct { - pipe map[string]chan *Command -} - -func NewChanMutex() *ChanMutex { - return &ChanMutex{pipe: make(map[string]chan *Command)} -} - -func (cm *ChanMutex) Execute(c *Command) { - if _, exists := cm.pipe[c.name]; !exists { - cm.pipe[c.name] = make(chan *Command, 1) - } - c.Execute() -} - -func main() { - go cm.Execute(&Command{"vdf:rif", "prima rif"}) - go cm.Execute(&Command{"vdf:dan", "prima Dan"}) - go cm.Execute(&Command{"vdf:rif", "a doua rif"}) - time.Sleep(5 * time.Second) -}