mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Remove Balancer
This commit is contained in:
@@ -1,104 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package balancer2go
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// The main balancer type
|
||||
type Balancer struct {
|
||||
sync.RWMutex
|
||||
clients map[string]Worker
|
||||
balancerChannel chan Worker
|
||||
}
|
||||
|
||||
// Interface for RPC clients
|
||||
type Worker interface {
|
||||
Call(serviceMethod string, args interface{}, reply interface{}) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
// Constructor for RateList holding one slice for addreses and one slice for connections.
|
||||
func NewBalancer() *Balancer {
|
||||
r := &Balancer{clients: make(map[string]Worker), balancerChannel: make(chan Worker)} // leaving both slices to nil
|
||||
go func() {
|
||||
for {
|
||||
if len(r.clients) > 0 {
|
||||
for _, c := range r.clients {
|
||||
r.balancerChannel <- c
|
||||
}
|
||||
} else {
|
||||
r.balancerChannel <- nil
|
||||
}
|
||||
}
|
||||
}()
|
||||
return r
|
||||
}
|
||||
|
||||
// Adds a client to the two internal map.
|
||||
func (bl *Balancer) AddClient(address string, client Worker) {
|
||||
bl.Lock()
|
||||
defer bl.Unlock()
|
||||
bl.clients[address] = client
|
||||
return
|
||||
}
|
||||
|
||||
// Removes a client from the map locking the readers and reseting the balancer index.
|
||||
func (bl *Balancer) RemoveClient(address string) {
|
||||
bl.Lock()
|
||||
defer bl.Unlock()
|
||||
delete(bl.clients, address)
|
||||
<-bl.balancerChannel
|
||||
}
|
||||
|
||||
// Returns a client for the specifed address.
|
||||
func (bl *Balancer) GetClient(address string) (c Worker, exists bool) {
|
||||
bl.RLock()
|
||||
defer bl.RUnlock()
|
||||
c, exists = bl.clients[address]
|
||||
return
|
||||
}
|
||||
|
||||
// Returns the next available connection at each call looping at the end of connections.
|
||||
func (bl *Balancer) Balance() (result Worker) {
|
||||
bl.RLock()
|
||||
defer bl.RUnlock()
|
||||
return <-bl.balancerChannel
|
||||
}
|
||||
|
||||
// Sends a shotdown call to the clients
|
||||
func (bl *Balancer) Shutdown(shutdownMethod string) {
|
||||
bl.Lock()
|
||||
defer bl.Unlock()
|
||||
var reply string
|
||||
for _, client := range bl.clients {
|
||||
client.Call(shutdownMethod, "", &reply)
|
||||
}
|
||||
}
|
||||
|
||||
// Returns a string slice with all client addresses
|
||||
func (bl *Balancer) GetClientAddresses() []string {
|
||||
bl.RLock()
|
||||
defer bl.RUnlock()
|
||||
var addresses []string
|
||||
for a, _ := range bl.clients {
|
||||
addresses = append(addresses, a)
|
||||
}
|
||||
return addresses
|
||||
}
|
||||
@@ -1,69 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package balancer2go
|
||||
|
||||
import (
|
||||
"net/rpc"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func BenchmarkBalance(b *testing.B) {
|
||||
balancer := NewBalancer()
|
||||
balancer.AddClient("client 1", new(rpc.Client))
|
||||
balancer.AddClient("client 2", new(rpc.Client))
|
||||
balancer.AddClient("client 3", new(rpc.Client))
|
||||
for i := 0; i < b.N; i++ {
|
||||
balancer.Balance()
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoving(t *testing.T) {
|
||||
balancer := NewBalancer()
|
||||
c1 := new(rpc.Client)
|
||||
c2 := new(rpc.Client)
|
||||
c3 := new(rpc.Client)
|
||||
balancer.AddClient("client 1", c1)
|
||||
balancer.AddClient("client 2", c2)
|
||||
balancer.AddClient("client 3", c3)
|
||||
balancer.RemoveClient("client 2")
|
||||
if balancer.clients["client 1"] != c1 ||
|
||||
balancer.clients["client 3"] != c3 ||
|
||||
len(balancer.clients) != 2 {
|
||||
t.Error("Failed removing rater")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGet(t *testing.T) {
|
||||
balancer := NewBalancer()
|
||||
c1 := new(rpc.Client)
|
||||
balancer.AddClient("client 1", c1)
|
||||
result, ok := balancer.GetClient("client 1")
|
||||
if !ok || c1 != result {
|
||||
t.Error("Get failed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOneBalancer(t *testing.T) {
|
||||
balancer := NewBalancer()
|
||||
balancer.AddClient("client 1", new(rpc.Client))
|
||||
c1 := balancer.Balance()
|
||||
c2 := balancer.Balance()
|
||||
if c1 != c2 {
|
||||
t.Error("With only one rater these shoud be equal")
|
||||
}
|
||||
}
|
||||
@@ -32,7 +32,6 @@ import (
|
||||
"github.com/cgrates/cgrates/agents"
|
||||
"github.com/cgrates/cgrates/apier/v1"
|
||||
"github.com/cgrates/cgrates/apier/v2"
|
||||
"github.com/cgrates/cgrates/balancer2go"
|
||||
"github.com/cgrates/cgrates/cache"
|
||||
"github.com/cgrates/cgrates/cdrc"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
@@ -657,7 +656,6 @@ func main() {
|
||||
// Async starts here, will follow cgrates.json start order
|
||||
|
||||
// Define internal connections via channels
|
||||
internalBalancerChan := make(chan *balancer2go.Balancer, 1)
|
||||
internalRaterChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
cacheDoneChan := make(chan struct{}, 1)
|
||||
internalCdrSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
@@ -672,14 +670,9 @@ func main() {
|
||||
// Start ServiceManager
|
||||
srvManager := servmanager.NewServiceManager(cfg, dataDB, exitChan, cacheDoneChan)
|
||||
|
||||
// Start balancer service
|
||||
if cfg.BalancerEnabled {
|
||||
go startBalancer(internalBalancerChan, &stopHandled, exitChan) // Not really needed async here but to cope with uniformity
|
||||
}
|
||||
|
||||
// Start rater service
|
||||
if cfg.RALsEnabled {
|
||||
go startRater(internalRaterChan, cacheDoneChan, internalBalancerChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
|
||||
go startRater(internalRaterChan, cacheDoneChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
|
||||
srvManager, server, dataDB, loadDb, cdrDb, &stopHandled, exitChan)
|
||||
}
|
||||
|
||||
|
||||
@@ -22,20 +22,6 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
/*func TestRPCGet(t *testing.T) {
|
||||
client, err := rpc.DialHTTPPath("tcp", "localhost:2000", "/rpc")
|
||||
if err != nil {
|
||||
t.Error("Balancer server not started!")
|
||||
t.FailNow()
|
||||
}
|
||||
var reply string
|
||||
client.Call("Responder.Get", "test", &reply)
|
||||
const expect = "12223"
|
||||
if reply != expect {
|
||||
t.Errorf("replay == %v, want %v", reply, expect)
|
||||
}
|
||||
}*/
|
||||
|
||||
func BenchmarkRPCGet(b *testing.B) {
|
||||
b.StopTimer()
|
||||
client, _ := rpc.DialHTTPPath("tcp", "localhost:2000", "/rpc")
|
||||
|
||||
@@ -19,11 +19,9 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/apier/v1"
|
||||
"github.com/cgrates/cgrates/apier/v2"
|
||||
"github.com/cgrates/cgrates/balancer2go"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/history"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
@@ -42,15 +40,8 @@ import (
|
||||
gob.Register(engine.AliasValues{})
|
||||
}*/
|
||||
|
||||
func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled *bool, exitChan chan bool) {
|
||||
bal := balancer2go.NewBalancer()
|
||||
go stopBalancerSignalHandler(bal, exitChan)
|
||||
*stopHandled = true
|
||||
internalBalancerChan <- bal
|
||||
}
|
||||
|
||||
// Starts rater and reports on chan
|
||||
func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneChan chan struct{}, internalBalancerChan chan *balancer2go.Balancer,
|
||||
func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneChan chan struct{},
|
||||
internalCdrStatSChan chan rpcclient.RpcClientConnection, internalHistorySChan chan rpcclient.RpcClientConnection,
|
||||
internalPubSubSChan chan rpcclient.RpcClientConnection, internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection,
|
||||
serviceManager *servmanager.ServiceManager, server *utils.Server,
|
||||
@@ -127,28 +118,6 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
|
||||
cacheDoneChan <- struct{}{}
|
||||
}()
|
||||
|
||||
var bal *balancer2go.Balancer
|
||||
if cfg.RALsBalancer != "" { // Connection to balancer
|
||||
balTaskChan := make(chan struct{})
|
||||
waitTasks = append(waitTasks, balTaskChan)
|
||||
go func() {
|
||||
defer close(balTaskChan)
|
||||
if cfg.RALsBalancer == utils.MetaInternal {
|
||||
select {
|
||||
case bal = <-internalBalancerChan:
|
||||
internalBalancerChan <- bal // Put it back if someone else is interested about
|
||||
case <-time.After(cfg.InternalTtl):
|
||||
utils.Logger.Crit("<Rater>: Internal balancer connection timeout.")
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
} else {
|
||||
go registerToBalancer(exitChan)
|
||||
go stopRaterSignalHandler(internalCdrStatSChan, exitChan)
|
||||
*stopHandled = true
|
||||
}
|
||||
}()
|
||||
}
|
||||
var cdrStats *rpcclient.RpcClientPool
|
||||
if len(cfg.RALsCDRStatSConns) != 0 { // Connections to CDRStats
|
||||
cdrstatTaskChan := make(chan struct{})
|
||||
@@ -228,7 +197,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
|
||||
for _, chn := range waitTasks {
|
||||
<-chn
|
||||
}
|
||||
responder := &engine.Responder{Bal: bal, ExitChan: exitChan}
|
||||
responder := &engine.Responder{ExitChan: exitChan}
|
||||
responder.SetTimeToLive(cfg.ResponseCacheTTL, nil)
|
||||
apierRpcV1 := &v1.ApierV1{StorDb: loadDb, DataDB: dataDB, CdrDb: cdrDb,
|
||||
Config: cfg, Responder: responder, ServManager: serviceManager, HTTPPoster: utils.NewHTTPPoster(cfg.HttpSkipTlsVerify, cfg.ReplyTimeout)}
|
||||
|
||||
@@ -19,38 +19,23 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/rpc"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/cgrates/cgrates/balancer2go"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
/*
|
||||
Listens for SIGTERM, SIGINT, SIGQUIT system signals and shuts down all the registered engines.
|
||||
*/
|
||||
func stopBalancerSignalHandler(bal *balancer2go.Balancer, exitChan chan bool) {
|
||||
c := make(chan os.Signal)
|
||||
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
|
||||
sig := <-c
|
||||
utils.Logger.Info(fmt.Sprintf("Caught signal %v, sending shutdown to engines\n", sig))
|
||||
bal.Shutdown("Responder.Shutdown")
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
/*
|
||||
Listens for the SIGTERM, SIGINT, SIGQUIT system signals and gracefuly unregister from balancer and closes the storage before exiting.
|
||||
Listens for the SIGTERM, SIGINT, SIGQUIT system signals and closes the storage before exiting.
|
||||
*/
|
||||
func stopRaterSignalHandler(internalCdrStatSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
|
||||
c := make(chan os.Signal)
|
||||
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
|
||||
sig := <-c
|
||||
|
||||
utils.Logger.Info(fmt.Sprintf("Caught signal %v, unregistering from balancer\n", sig))
|
||||
unregisterFromBalancer(exitChan)
|
||||
utils.Logger.Info(fmt.Sprintf("Caught signal %v", sig))
|
||||
var dummyInt int
|
||||
select {
|
||||
case cdrStats := <-internalCdrStatSChan:
|
||||
@@ -60,45 +45,6 @@ func stopRaterSignalHandler(internalCdrStatSChan chan rpcclient.RpcClientConnect
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
/*
|
||||
Connects to the balancer and calls unregister RPC method.
|
||||
*/
|
||||
func unregisterFromBalancer(exitChan chan bool) {
|
||||
client, err := rpc.Dial("tcp", cfg.RALsBalancer)
|
||||
if err != nil {
|
||||
utils.Logger.Crit("Cannot contact the balancer!")
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
var reply int
|
||||
utils.Logger.Info(fmt.Sprintf("Unregistering from balancer %s", cfg.RALsBalancer))
|
||||
client.Call("Responder.UnRegisterRater", cfg.RPCGOBListen, &reply)
|
||||
if err := client.Close(); err != nil {
|
||||
utils.Logger.Crit("Could not close balancer unregistration!")
|
||||
exitChan <- true
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Connects to the balancer and rehisters the engine to the server.
|
||||
*/
|
||||
func registerToBalancer(exitChan chan bool) {
|
||||
client, err := rpc.Dial("tcp", cfg.RALsBalancer)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("Cannot contact the balancer: %v", err))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
var reply int
|
||||
utils.Logger.Info(fmt.Sprintf("Registering to balancer %s", cfg.RALsBalancer))
|
||||
client.Call("Responder.RegisterRater", cfg.RPCGOBListen, &reply)
|
||||
if err := client.Close(); err != nil {
|
||||
utils.Logger.Crit("Could not close balancer registration!")
|
||||
exitChan <- true
|
||||
}
|
||||
utils.Logger.Info("Registration finished!")
|
||||
}
|
||||
|
||||
/*
|
||||
Listens for the SIGTERM, SIGINT, SIGQUIT system signals and shuts down the session manager.
|
||||
*/
|
||||
|
||||
@@ -220,7 +220,6 @@ type CGRConfig struct {
|
||||
LockingTimeout time.Duration // locking mechanism timeout to avoid deadlocks
|
||||
LogLevel int // system wide log level, nothing higher than this will be logged
|
||||
RALsEnabled bool // start standalone server (no balancer)
|
||||
RALsBalancer string // balancer address host:port
|
||||
RALsCDRStatSConns []*HaPoolConfig // address where to reach the cdrstats service. Empty to disable stats gathering <""|internal|x.y.z.y:1234>
|
||||
RALsHistorySConns []*HaPoolConfig
|
||||
RALsPubSubSConns []*HaPoolConfig
|
||||
@@ -228,7 +227,6 @@ type CGRConfig struct {
|
||||
RALsAliasSConns []*HaPoolConfig
|
||||
RpSubjectPrefixMatching bool // enables prefix matching for the rating profile subject
|
||||
LcrSubjectPrefixMatching bool // enables prefix matching for the lcr subject
|
||||
BalancerEnabled bool
|
||||
SchedulerEnabled bool
|
||||
CDRSEnabled bool // Enable CDR Server service
|
||||
CDRSExtraFields []*utils.RSRField // Extra fields to store in CDRs
|
||||
@@ -274,9 +272,6 @@ type CGRConfig struct {
|
||||
func (self *CGRConfig) checkConfigSanity() error {
|
||||
// Rater checks
|
||||
if self.RALsEnabled {
|
||||
if self.RALsBalancer == utils.MetaInternal && !self.BalancerEnabled {
|
||||
return errors.New("Balancer not enabled but requested by Rater component.")
|
||||
}
|
||||
for _, connCfg := range self.RALsCDRStatSConns {
|
||||
if connCfg.Address == utils.MetaInternal && !self.CDRStatsEnabled {
|
||||
return errors.New("CDRStats not enabled but requested by Rater component.")
|
||||
@@ -520,11 +515,6 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
|
||||
return err
|
||||
}
|
||||
|
||||
jsnBalancerCfg, err := jsnCfg.BalancerJsonCfg()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
jsnRALsCfg, err := jsnCfg.RalsJsonCfg()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -781,9 +771,6 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
|
||||
if jsnRALsCfg.Enabled != nil {
|
||||
self.RALsEnabled = *jsnRALsCfg.Enabled
|
||||
}
|
||||
if jsnRALsCfg.Balancer != nil {
|
||||
self.RALsBalancer = *jsnRALsCfg.Balancer
|
||||
}
|
||||
if jsnRALsCfg.Cdrstats_conns != nil {
|
||||
self.RALsCDRStatSConns = make([]*HaPoolConfig, len(*jsnRALsCfg.Cdrstats_conns))
|
||||
for idx, jsnHaCfg := range *jsnRALsCfg.Cdrstats_conns {
|
||||
@@ -826,15 +813,9 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
|
||||
self.LcrSubjectPrefixMatching = *jsnRALsCfg.Lcr_subject_prefix_matching
|
||||
}
|
||||
}
|
||||
|
||||
if jsnBalancerCfg != nil && jsnBalancerCfg.Enabled != nil {
|
||||
self.BalancerEnabled = *jsnBalancerCfg.Enabled
|
||||
}
|
||||
|
||||
if jsnSchedCfg != nil && jsnSchedCfg.Enabled != nil {
|
||||
self.SchedulerEnabled = *jsnSchedCfg.Enabled
|
||||
}
|
||||
|
||||
if jsnCdrsCfg != nil {
|
||||
if jsnCdrsCfg.Enabled != nil {
|
||||
self.CDRSEnabled = *jsnCdrsCfg.Enabled
|
||||
|
||||
@@ -106,15 +106,8 @@ const CGRATES_CFG_JSON = `
|
||||
"cdrs_indexes": [], // indexes on cdrs table to speed up queries, used only in case of mongo
|
||||
},
|
||||
|
||||
|
||||
"balancer": {
|
||||
"enabled": false, // start Balancer service: <true|false>
|
||||
},
|
||||
|
||||
|
||||
"rals": {
|
||||
"enabled": false, // enable Rater service: <true|false>
|
||||
"balancer": "", // register to balancer as worker: <""|*internal|x.y.z.y:1234>
|
||||
"cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
|
||||
"historys_conns": [], // address where to reach the history service, empty to disable history functionality: <""|*internal|x.y.z.y:1234>
|
||||
"pubsubs_conns": [], // address where to reach the pubusb service, empty to disable pubsub functionality: <""|*internal|x.y.z.y:1234>
|
||||
|
||||
@@ -32,7 +32,6 @@ const (
|
||||
HTTP_JSN = "http"
|
||||
DATADB_JSN = "data_db"
|
||||
STORDB_JSN = "stor_db"
|
||||
BALANCER_JSN = "balancer"
|
||||
RALS_JSN = "rals"
|
||||
SCHEDULER_JSN = "scheduler"
|
||||
CDRS_JSN = "cdrs"
|
||||
@@ -142,18 +141,6 @@ func (self CgrJsonCfg) DbJsonCfg(section string) (*DbJsonCfg, error) {
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (self CgrJsonCfg) BalancerJsonCfg() (*BalancerJsonCfg, error) {
|
||||
rawCfg, hasKey := self[BALANCER_JSN]
|
||||
if !hasKey {
|
||||
return nil, nil
|
||||
}
|
||||
cfg := new(BalancerJsonCfg)
|
||||
if err := json.Unmarshal(*rawCfg, cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (self CgrJsonCfg) RalsJsonCfg() (*RalsJsonCfg, error) {
|
||||
rawCfg, hasKey := self[RALS_JSN]
|
||||
if !hasKey {
|
||||
|
||||
@@ -150,17 +150,8 @@ func TestDfDbJsonCfg(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDfBalancerJsonCfg(t *testing.T) {
|
||||
eCfg := &BalancerJsonCfg{Enabled: utils.BoolPointer(false)}
|
||||
if cfg, err := dfCgrJsonCfg.BalancerJsonCfg(); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eCfg, cfg) {
|
||||
t.Error("Received: ", cfg)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDfRalsJsonCfg(t *testing.T) {
|
||||
eCfg := &RalsJsonCfg{Enabled: utils.BoolPointer(false), Balancer: utils.StringPointer(""), Cdrstats_conns: &[]*HaPoolJsonCfg{},
|
||||
eCfg := &RalsJsonCfg{Enabled: utils.BoolPointer(false), Cdrstats_conns: &[]*HaPoolJsonCfg{},
|
||||
Historys_conns: &[]*HaPoolJsonCfg{}, Pubsubs_conns: &[]*HaPoolJsonCfg{}, Users_conns: &[]*HaPoolJsonCfg{}, Aliases_conns: &[]*HaPoolJsonCfg{},
|
||||
Rp_subject_prefix_matching: utils.BoolPointer(false), Lcr_subject_prefix_matching: utils.BoolPointer(false)}
|
||||
if cfg, err := dfCgrJsonCfg.RalsJsonCfg(); err != nil {
|
||||
|
||||
@@ -262,12 +262,6 @@ func TestCgrCfgJSONDefaultsStorDB(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCgrCfgJSONDefaultsBalancer(t *testing.T) {
|
||||
if cgrCfg.BalancerEnabled != false {
|
||||
t.Error(cgrCfg.BalancerEnabled)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCgrCfgJSONDefaultsRALs(t *testing.T) {
|
||||
|
||||
eHaPoolcfg := []*HaPoolConfig{}
|
||||
@@ -275,9 +269,6 @@ func TestCgrCfgJSONDefaultsRALs(t *testing.T) {
|
||||
if cgrCfg.RALsEnabled != false {
|
||||
t.Error(cgrCfg.RALsEnabled)
|
||||
}
|
||||
if cgrCfg.RALsBalancer != "" {
|
||||
t.Error(cgrCfg.RALsBalancer)
|
||||
}
|
||||
if !reflect.DeepEqual(cgrCfg.RALsCDRStatSConns, eHaPoolcfg) {
|
||||
t.Error(cgrCfg.RALsCDRStatSConns)
|
||||
}
|
||||
|
||||
@@ -69,15 +69,9 @@ type DbJsonCfg struct {
|
||||
Cdrs_indexes *[]string
|
||||
}
|
||||
|
||||
// Balancer config section
|
||||
type BalancerJsonCfg struct {
|
||||
Enabled *bool
|
||||
}
|
||||
|
||||
// Rater config section
|
||||
type RalsJsonCfg struct {
|
||||
Enabled *bool
|
||||
Balancer *string
|
||||
Cdrstats_conns *[]*HaPoolJsonCfg
|
||||
Historys_conns *[]*HaPoolJsonCfg
|
||||
Pubsubs_conns *[]*HaPoolJsonCfg
|
||||
|
||||
@@ -87,14 +87,8 @@
|
||||
// },
|
||||
|
||||
|
||||
// "balancer": {
|
||||
// "enabled": false, // start Balancer service: <true|false>
|
||||
// },
|
||||
|
||||
|
||||
// "rals": {
|
||||
// "enabled": false, // enable Rater service: <true|false>
|
||||
// "balancer": "", // register to balancer as worker: <""|*internal|x.y.z.y:1234>
|
||||
// "cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
|
||||
// "historys_conns": [], // address where to reach the history service, empty to disable history functionality: <""|*internal|x.y.z.y:1234>
|
||||
// "pubsubs_conns": [], // address where to reach the pubusb service, empty to disable pubsub functionality: <""|*internal|x.y.z.y:1234>
|
||||
|
||||
@@ -5,7 +5,7 @@ The general steps to get CGRateS operational are:
|
||||
|
||||
#. Create CSV files containing the initial data for CGRateS.
|
||||
#. Load the data in the databases using the Loader application.
|
||||
#. Start the a Balancer or a Rater. If Balancer is used, start one or more Raters serving that Balancer.
|
||||
#. Start the SessionManager talking to your VoIP Switch or directly make API calls to the Balancer/Rater.
|
||||
#. Make API calls to the Balancer/Rater or just let the SessionManager do the work.
|
||||
#. Start a Rater.
|
||||
#. Start the SessionManager talking to your VoIP Switch or directly make API calls to the Rater.
|
||||
#. Make API calls to the Rater or just let the SessionManager do the work.
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
API Calls
|
||||
=========
|
||||
The general API usage of the CGRateS involves creating a CallDescriptor structure sending it to the balancer via JSON/GOB RPC and getting a response from the balancer in form of a CallCost structure or a numeric value for requested information.
|
||||
The general API usage of the CGRateS involves creating a CallDescriptor structure sending it to the rater via JSON/GOB RPC and getting a response from the rater in form of a CallCost structure or a numeric value for requested information.
|
||||
|
||||
CallDescriptor structure
|
||||
------------------------
|
||||
@@ -48,11 +48,11 @@ Cost
|
||||
ConnectFee
|
||||
The requested connection cost
|
||||
Timespans
|
||||
The timespans in witch the initial TimeStart-TimeEnd was split in for cost determination with all pricing and cost information attached.
|
||||
The timespans in witch the initial TimeStart-TimeEnd was split in for cost determination with all pricing and cost information attached.
|
||||
|
||||
As stated before the balancer (or the rater directly) can be accesed via json rpc.
|
||||
As stated before the rater can be accesed via json rpc.
|
||||
|
||||
The smallest python snippet to access the CGRateS balancer is this:
|
||||
The smallest python snippet to access the CGRateS rater is this:
|
||||
|
||||
::
|
||||
|
||||
@@ -76,7 +76,7 @@ In the stress folder you can find a better example of python client using a clas
|
||||
rpc =JSONClient(("127.0.0.1", 2001))
|
||||
result = rpc.call("Responder.Get", cd)
|
||||
print result
|
||||
|
||||
|
||||
Call API
|
||||
--------
|
||||
GetCost
|
||||
@@ -102,7 +102,7 @@ DebitSeconds
|
||||
The amount filed has to be filled in call descriptor.
|
||||
|
||||
GetMaxSessionTime
|
||||
Returns the approximate max allowed session for user budget. It will try the max amount received in the call descriptor
|
||||
Returns the approximate max allowed session for user budget. It will try the max amount received in the call descriptor
|
||||
and will decrease it by 10% for nine times. So if the user has little credit it will still allow 10% of the initial amount.
|
||||
If the user has no credit then it will return 0.
|
||||
|
||||
@@ -336,7 +336,7 @@ SetAcount
|
||||
Type string // <*prepaid|*postpaid>
|
||||
ActionTimingsId string
|
||||
}
|
||||
|
||||
|
||||
Example
|
||||
AddAccount(attr \*AttrAddAccount, reply \*string)
|
||||
|
||||
|
||||
@@ -72,24 +72,7 @@ it will start on demand **one or more** service(s), outlined below.
|
||||
.. hint:: # cgr-engine -config_dir=/etc/cgrates
|
||||
|
||||
|
||||
2.1.1. Balancer service
|
||||
~~~~~~~~~~~~~~~~~~~~~~~
|
||||
*Optional* component, used as proxy/balancer to a pool of RAL workers.
|
||||
The RALs will register their availability to the Balancer thus implementing **dynamic HA functionality**.
|
||||
**HA functionality** can be archived also *without* the **Balancer**.
|
||||
|
||||
- Communicates via:
|
||||
- RPC
|
||||
- internal/in-process *within the same running* **cgr-engine** process.
|
||||
|
||||
- Operates with the following CGRateS database(s): ::
|
||||
|
||||
- none
|
||||
|
||||
- Config section in the CGRateS configuration file:
|
||||
- ``"balancer": {...}``
|
||||
|
||||
2.1.2. RALs service
|
||||
2.1.1. RALs service
|
||||
~~~~~~~~~~~~~~~~~~~~
|
||||
Responsible with the following tasks:
|
||||
|
||||
@@ -110,7 +93,7 @@ Responsible with the following tasks:
|
||||
- Config section in the CGRateS configuration file:
|
||||
- ``"rals": {...}``
|
||||
|
||||
2.1.3. Scheduler service
|
||||
2.1.2. Scheduler service
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Used to execute periodic/scheduled tasks.
|
||||
|
||||
@@ -124,7 +107,7 @@ Used to execute periodic/scheduled tasks.
|
||||
- Config section in the CGRateS configuration file:
|
||||
- ``"scheduler": {...}``
|
||||
|
||||
2.1.4. SessionManager service
|
||||
2.1.3. SessionManager service
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Responsible with call control on the Telecommunication Switch side. Operates in two different modes (per call or globally):
|
||||
|
||||
@@ -164,7 +147,7 @@ Right now there are **five** session manager types.
|
||||
- ``"sm_asterisk": {...}``
|
||||
- ``"sm_generic": {...}``
|
||||
|
||||
2.1.5. DiameterAgent service
|
||||
2.1.4. DiameterAgent service
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Responsible for the communication with Diameter server via diameter protocol.
|
||||
Despite the name it is a flexible **Diameter Server**.
|
||||
@@ -180,7 +163,7 @@ Despite the name it is a flexible **Diameter Server**.
|
||||
- Config section in the CGRateS configuration file:
|
||||
- ``"diameter_agent": {...}``
|
||||
|
||||
2.1.6. CDRS service
|
||||
2.1.5. CDRS service
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
Centralized CDR server and CDR (raw or rated) **replicator**.
|
||||
|
||||
@@ -196,7 +179,7 @@ Centralized CDR server and CDR (raw or rated) **replicator**.
|
||||
- Config section in the CGRateS configuration file:
|
||||
- ``"cdrs": {...}``
|
||||
|
||||
2.1.7. CDRStats service
|
||||
2.1.6. CDRStats service
|
||||
~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Computes real-time CDR stats. Capable with real-time fraud detection and mitigation with actions triggered.
|
||||
|
||||
@@ -211,7 +194,7 @@ Computes real-time CDR stats. Capable with real-time fraud detection and mitigat
|
||||
- Config section in the CGRateS configuration file:
|
||||
- ``"cdrstats": {...}``
|
||||
|
||||
2.1.8. CDRC service
|
||||
2.1.7. CDRC service
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
Gathers offline CDRs and post them to CDR Server - (CDRS component)
|
||||
|
||||
@@ -226,7 +209,7 @@ Gathers offline CDRs and post them to CDR Server - (CDRS component)
|
||||
- Config section in the CGRateS configuration file:
|
||||
- ``"cdrc": {...}``
|
||||
|
||||
2.1.9. History service
|
||||
2.1.8. History service
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
Archives rate changes in human readable JSON format using **GIT**.
|
||||
|
||||
@@ -241,7 +224,7 @@ Archives rate changes in human readable JSON format using **GIT**.
|
||||
- Config section in the CGRateS configuration file:
|
||||
- ``"historys": {...}``
|
||||
|
||||
2.1.10. Aliases service
|
||||
2.1.9. Aliases service
|
||||
~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Generic purpose **aliasing** system.
|
||||
|
||||
@@ -261,7 +244,7 @@ Possible applications:
|
||||
- Config section in the CGRateS configuration file:
|
||||
- ``"aliases": {...}``
|
||||
|
||||
2.1.11. User service
|
||||
2.1.10. User service
|
||||
~~~~~~~~~~~~~~~~~~~~
|
||||
Generic purpose **user** system to maintain user profiles (LDAP similarity).
|
||||
|
||||
@@ -276,7 +259,7 @@ Generic purpose **user** system to maintain user profiles (LDAP similarity).
|
||||
- Config section in the CGRateS configuration file:
|
||||
- ``"users": {...}``
|
||||
|
||||
2.1.12. PubSub service
|
||||
2.1.11. PubSub service
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
PubSub service used to expose internal events to interested external components (eg: balance ops)
|
||||
|
||||
@@ -292,7 +275,7 @@ PubSub service used to expose internal events to interested external components
|
||||
- ``"pubsubs": {...}``
|
||||
|
||||
|
||||
2.1.13. Resource Limiter service
|
||||
2.1.12. Resource Limiter service
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Resource Limiter service used to limit resources during authorization (eg: maximum calls per destination for an account)
|
||||
|
||||
@@ -307,7 +290,7 @@ Resource Limiter service used to limit resources during authorization (eg: maxim
|
||||
- Config section in the CGRateS configuration file:
|
||||
- ``"rls": {...}``
|
||||
|
||||
2.1.14. APIER RPC service
|
||||
2.1.13. APIER RPC service
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
RPC service used to expose external access towards internal components.
|
||||
|
||||
@@ -316,7 +299,7 @@ RPC service used to expose external access towards internal components.
|
||||
- JSON over HTTP
|
||||
- JSON over WebSocket
|
||||
|
||||
2.1.15. Cdre
|
||||
2.1.14. Cdre
|
||||
~~~~~~~~~~~~
|
||||
Component to retrieve rated CDRs from internal CDRs database.
|
||||
|
||||
@@ -329,7 +312,7 @@ Component to retrieve rated CDRs from internal CDRs database.
|
||||
- Config section in the CGRateS configuration file:
|
||||
- ``"cdre": {...}``
|
||||
|
||||
2.1.16. Mailer
|
||||
2.1.15. Mailer
|
||||
~~~~~~~~~~~~~~
|
||||
TBD
|
||||
|
||||
@@ -340,7 +323,7 @@ TBD
|
||||
- Config section in the CGRateS configuration file:
|
||||
- ``"mailer": {...}``
|
||||
|
||||
2.1.17. Suretax
|
||||
2.1.16. Suretax
|
||||
~~~~~~~~~~~~~~~
|
||||
TBD
|
||||
|
||||
@@ -446,7 +429,7 @@ Can be used to:
|
||||
|
||||
2.3. cgr-console
|
||||
----------------
|
||||
Command line tool used to interface with the RALs (and/or Balancer) service. Able to execute **sub-commands**.
|
||||
Command line tool used to interface with the RALs service. Able to execute **sub-commands**.
|
||||
|
||||
::
|
||||
|
||||
|
||||
@@ -3,11 +3,11 @@
|
||||
|
||||
`CGRateS`_ is a *very fast* and *easily scalable* **(charging, rating, accounting, lcr, mediation, billing, authorization)** *ENGINE* targeted especially for ISPs and Telecom Operators.
|
||||
|
||||
It is written in `Go`_ programming language and is accessible from any programming language via JSON RPC.
|
||||
It is written in `Go`_ programming language and is accessible from any programming language via JSON RPC.
|
||||
The code is well documented (**go doc** compliant `API docs`_) and heavily tested. (also **1300+** tests are part of the build system).
|
||||
|
||||
After testing various databases like `Kyoto Cabinet`_, `Apache Cassandra`_, `Redis`_ and `MongoDB`_,
|
||||
the project focused on **Redis** as it delivers the best trade-off between speed, configuration and scalability.
|
||||
After testing various databases like `Kyoto Cabinet`_, `Apache Cassandra`_, `Redis`_ and `MongoDB`_,
|
||||
the project focused on **Redis** as it delivers the best trade-off between speed, configuration and scalability.
|
||||
|
||||
.. important:: `MongoDB`_ **full** support is now added.
|
||||
|
||||
@@ -26,17 +26,17 @@ To better understand the CGRateS architecture, below are some logical configurat
|
||||
.. note:: **RALs** - is a CGRateS component and stands for RatingAccountingLCR service.
|
||||
|
||||
.. image:: images/Simple.png
|
||||
This scenario fits most of the simple installations. The **Balancer** can be left out and the **RALs** can be queried directly.
|
||||
This scenario fits most of the simple installations.
|
||||
|
||||
.. image:: images/Normal.png
|
||||
While the network grows more **RALs** can be thrown into the stack to offer more requests per seconds workload.
|
||||
While the network grows more **RALs** can be thrown into the stack to offer more requests per seconds workload.
|
||||
This implies the usage of the **Balancer** to distribute the requests to the **RALs** running on the *different machines*.
|
||||
|
||||
.. image:: images/Normal_ha.png
|
||||
Without Balancer using HA (broadcast) ....
|
||||
Without Balancer using HA (broadcast) ....
|
||||
|
||||
.. image:: images/Complicated.png
|
||||
Of course more **SessionManagers** can serve *multiple Telecom Switches* and all of them are connected to the same **Balancer**.
|
||||
Of course more **SessionManagers** can serve *multiple Telecom Switches* and all of them are connected to the same **Balancer**.
|
||||
|
||||
.. image:: images/Complicated_ha.png
|
||||
Without Balancer using HA (broadcast) ....
|
||||
@@ -51,10 +51,9 @@ Without Balancer using HA (broadcast) ....
|
||||
- Using most modern programming concepts like multiprocessor support, asynchronous code execution within microthreads.
|
||||
- Built-in data caching system per call duration.
|
||||
- In-Memory database with persistence over restarts.
|
||||
- Use of Balancer assures High-Availability of RALs as well as increase of processing performance where that is required.
|
||||
- Use of Linux enterprise ready tools to assure High-Availability of the Balancer where that is required (*Supervise* for Application level availability and *LinuxHA* for Host level availability).
|
||||
- High-Availability of main components is now part of CGRateS core.
|
||||
|
||||
- High-Availability of main components is now part of CGRateS core.
|
||||
|
||||
- Modular architecture
|
||||
- Easy to enhance functionality by writing custom session managers or mediators.
|
||||
- Flexible API accessible via both **Gob** (Golang specific, increased performance) or **JSON** (platform independent, universally accessible).
|
||||
|
||||
@@ -2,33 +2,32 @@
|
||||
|
||||
1. Overview
|
||||
===========
|
||||
Starting as a pure **billing engine**, CGRateS has evolved over the years into a reliable **real-time charging framework** able to accommodate various business cases in a *generic way*.
|
||||
Meant to be pluggable into existing billing infrastructure and as non-intrusive as possible,
|
||||
Starting as a pure **billing engine**, CGRateS has evolved over the years into a reliable **real-time charging framework** able to accommodate various business cases in a *generic way*.
|
||||
Meant to be pluggable into existing billing infrastructure and as non-intrusive as possible,
|
||||
CGRateS passes the decisions about logic flow to system administrators and incorporates as less as possible business logic.
|
||||
|
||||
Being an *"engine style"* the project focuses on providing best ratio between **functionality** (
|
||||
over 15 daemons/services implemented,
|
||||
Multi-tenancy,
|
||||
derived charging - eg: chaining of the business resellers,
|
||||
account bundles,
|
||||
LCR,
|
||||
CDRStatS,
|
||||
Diameter Server,
|
||||
A-Number rating,
|
||||
over 15 daemons/services implemented,
|
||||
Multi-tenancy,
|
||||
derived charging - eg: chaining of the business resellers,
|
||||
account bundles,
|
||||
LCR,
|
||||
CDRStatS,
|
||||
Diameter Server,
|
||||
A-Number rating,
|
||||
built-in High-Availability support
|
||||
agile in developing new features
|
||||
)
|
||||
agile in developing new features
|
||||
)
|
||||
and **performance** (
|
||||
dedicated benchmark tool,
|
||||
asynchronous request processing,
|
||||
own transactional cache with majority of handled data loaded on start or reloaded during runtime,
|
||||
built-in balancer
|
||||
)
|
||||
dedicated benchmark tool,
|
||||
asynchronous request processing,
|
||||
own transactional cache with majority of handled data loaded on start or reloaded during runtime,
|
||||
)
|
||||
however not loosing focus of **quality** (over 1300 tests part of the build environment).
|
||||
|
||||
Modular and flexible, CGRateS provides APIs over a variety of simultaneously accessible communication interfaces:
|
||||
- **In-process** : optimal when there is no need to split services over different processes.
|
||||
- **JSON over TCP** : most preferred due to its simplicity and readability.
|
||||
- **JSON over TCP** : most preferred due to its simplicity and readability.
|
||||
- **JSON over HTTP** : popular due to fast interoperability development.
|
||||
- **JSON over Websockets** : useful where 2 ways interaction over same TCP socket is required.
|
||||
- **GOB over TCP** : slightly faster than JSON one but only accessible for the moment out of Go (`<https://golang.org/>`_).
|
||||
@@ -46,7 +45,7 @@ CGRateS is capable of four charging modes
|
||||
- Session authorization via events
|
||||
- Charging done at the end of the session out of CDR received
|
||||
- Advantage: less CPU intensive due to less events processed
|
||||
- Disadvantage: as balance updates happen only at the end of the session there can be costs discrepancy in case of multiple sessions out of same account
|
||||
- Disadvantage: as balance updates happen only at the end of the session there can be costs discrepancy in case of multiple sessions out of same account
|
||||
(including going on negative balance).
|
||||
|
||||
- \*postpaid
|
||||
@@ -67,16 +66,16 @@ CGRateS is capable of four charging modes
|
||||
- Primary component, offering the most functionality out of the subsystems.
|
||||
- Computes replies based on static list of "rules" defined in TariffPlan.
|
||||
|
||||
2.1.1. Rater
|
||||
2.1.1. Rater
|
||||
~~~~~~~~~~~~
|
||||
- Defines the performance of the system as a whole being the "heart" component
|
||||
- Support for multiple TypeOfRecord (**\*voice**, **\*data**, **\*sms**, **\*generic**)
|
||||
- Time based calculations (activation time in the future/rate-destination timely coupled) with granular time definitions (year, month, month day, weekday, time in seconds)
|
||||
- Compressed destination prefixes, helping on faster destination match as well as memory consumption
|
||||
- Advanced Rating capabilities:
|
||||
ConnectFee (charged at beginning of the session);
|
||||
RateUnit (automatic divider for the cost);
|
||||
RateIncrement (increase verbosity of the charging interval);
|
||||
- Advanced Rating capabilities:
|
||||
ConnectFee (charged at beginning of the session);
|
||||
RateUnit (automatic divider for the cost);
|
||||
RateIncrement (increase verbosity of the charging interval);
|
||||
Grouped interval rating inside the call duration (charging each second within a session independently)
|
||||
- Per destination rounding: control number of decimals displayed in costs, decide rounding methods (**\*up**, **\*down**, **\*middle**)
|
||||
- Control of the MaxSessionCost with decision on action taken on threshold hit (**\*free**, **\*disconnect**)
|
||||
@@ -127,7 +126,7 @@ CGRateS is capable of four charging modes
|
||||
- In-memory / performance oriented
|
||||
- Unlimited StatQueues computing the same CDR event
|
||||
- Flexible queue configuration (QueueLength, TimeWindow, Metrics, CDR field filters)
|
||||
- Fraud detection with automatic mitigation through action triggers
|
||||
- Fraud detection with automatic mitigation through action triggers
|
||||
|
||||
2.4. AliaseS
|
||||
------------
|
||||
@@ -205,7 +204,7 @@ CGRateS is capable of four charging modes
|
||||
3. CGRateS Peripherals
|
||||
======================
|
||||
Packaged together due to common usage
|
||||
|
||||
|
||||
3.1. cgr-engine
|
||||
---------------
|
||||
- Configured via .json files, encorporating CGRateS subsystems mentioned above
|
||||
@@ -241,10 +240,10 @@ Packaged together due to common usage
|
||||
4.1. Fraud detection within Accounting:
|
||||
---------------------------------------
|
||||
- Events are happening in real-time, being available during updates (eg: every n seconds of a session).
|
||||
- Thresholds set by the administrator are reacting by calling a set of predefined actions **synchronously**
|
||||
- Thresholds set by the administrator are reacting by calling a set of predefined actions **synchronously**
|
||||
(with the advantage of having account in locked state, eg. no other events are possible until decision is made) or **asynchronously** (unlocking the accounts faster)
|
||||
- Two types of thresholds can be set
|
||||
- **min-/max-balance** monitoring balance values
|
||||
- Two types of thresholds can be set
|
||||
- **min-/max-balance** monitoring balance values
|
||||
- **min-/max-usage** counters (eg: amount of minutes to specific destination).
|
||||
- Middle session control (sessions can be disconnected as fraud is detected
|
||||
|
||||
|
||||
@@ -19,14 +19,11 @@ package engine
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/rpc"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/balancer2go"
|
||||
"github.com/cgrates/cgrates/cache"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
@@ -48,7 +45,6 @@ type AttrGetLcr struct {
|
||||
}
|
||||
|
||||
type Responder struct {
|
||||
Bal *balancer2go.Balancer
|
||||
ExitChan chan bool
|
||||
Stats rpcclient.RpcClientConnection
|
||||
Timeout time.Duration
|
||||
@@ -94,19 +90,14 @@ func (rs *Responder) GetCost(arg *CallDescriptor, reply *CallCost) (err error) {
|
||||
}, arg, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
if rs.Bal != nil {
|
||||
r, e := rs.getCallCost(arg, "Responder.GetCost")
|
||||
*reply, err = *r, e
|
||||
} else {
|
||||
r, e := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
return arg.GetCost()
|
||||
}, 0, arg.GetAccountKey())
|
||||
if r != nil {
|
||||
*reply = *r.(*CallCost)
|
||||
}
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
r, e := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
return arg.GetCost()
|
||||
}, 0, arg.GetAccountKey())
|
||||
if r != nil {
|
||||
*reply = *r.(*CallCost)
|
||||
}
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -132,17 +123,11 @@ func (rs *Responder) Debit(arg *CallDescriptor, reply *CallCost) (err error) {
|
||||
}, arg, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
|
||||
if rs.Bal != nil {
|
||||
r, e := rs.getCallCost(arg, "Responder.Debit")
|
||||
*reply, err = *r, e
|
||||
} else {
|
||||
r, e := arg.Debit()
|
||||
if e != nil {
|
||||
return e
|
||||
} else if r != nil {
|
||||
*reply = *r
|
||||
}
|
||||
r, e := arg.Debit()
|
||||
if e != nil {
|
||||
return e
|
||||
} else if r != nil {
|
||||
*reply = *r
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -175,20 +160,14 @@ func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error)
|
||||
}, arg, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
|
||||
if rs.Bal != nil {
|
||||
r, e := rs.getCallCost(arg, "Responder.MaxDebit")
|
||||
*reply, err = *r, e
|
||||
} else {
|
||||
r, e := arg.MaxDebit()
|
||||
if e != nil {
|
||||
rs.getCache().Cache(cacheKey, &cache.CacheItem{
|
||||
Err: e,
|
||||
})
|
||||
return e
|
||||
} else if r != nil {
|
||||
*reply = *r
|
||||
}
|
||||
r, e := arg.MaxDebit()
|
||||
if e != nil {
|
||||
rs.getCache().Cache(cacheKey, &cache.CacheItem{
|
||||
Err: e,
|
||||
})
|
||||
return e
|
||||
} else if r != nil {
|
||||
*reply = *r
|
||||
}
|
||||
rs.getCache().Cache(cacheKey, &cache.CacheItem{
|
||||
Value: reply,
|
||||
@@ -228,12 +207,7 @@ func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
if rs.Bal != nil {
|
||||
*reply, err = rs.callMethod(arg, "Responder.RefundIncrements")
|
||||
} else {
|
||||
err = arg.RefundIncrements()
|
||||
}
|
||||
err = arg.RefundIncrements()
|
||||
rs.getCache().Cache(cacheKey, &cache.CacheItem{
|
||||
Value: reply,
|
||||
Err: err,
|
||||
@@ -272,12 +246,7 @@ func (rs *Responder) RefundRounding(arg *CallDescriptor, reply *float64) (err er
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
if rs.Bal != nil {
|
||||
*reply, err = rs.callMethod(arg, "Responder.RefundRounding")
|
||||
} else {
|
||||
err = arg.RefundRounding()
|
||||
}
|
||||
err = arg.RefundRounding()
|
||||
rs.getCache().Cache(cacheKey, &cache.CacheItem{
|
||||
Value: reply,
|
||||
Err: err,
|
||||
@@ -306,21 +275,13 @@ func (rs *Responder) GetMaxSessionTime(arg *CallDescriptor, reply *float64) (err
|
||||
}, arg, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
|
||||
if rs.Bal != nil {
|
||||
*reply, err = rs.callMethod(arg, "Responder.GetMaxSessionTime")
|
||||
} else {
|
||||
r, e := arg.GetMaxSessionDuration()
|
||||
*reply, err = float64(r), e
|
||||
}
|
||||
r, e := arg.GetMaxSessionDuration()
|
||||
*reply, err = float64(r), e
|
||||
return
|
||||
}
|
||||
|
||||
// Returns MaxSessionTime for an event received in SessionManager, considering DerivedCharging for it
|
||||
func (rs *Responder) GetDerivedMaxSessionTime(ev *CDR, reply *float64) error {
|
||||
if rs.Bal != nil {
|
||||
return errors.New("unsupported method on the balancer")
|
||||
}
|
||||
cacheKey := utils.GET_DERIV_MAX_SESS_TIME + ev.CGRID + ev.RunID
|
||||
if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil {
|
||||
if item.Value != nil {
|
||||
@@ -426,9 +387,6 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *CDR, reply *float64) error {
|
||||
|
||||
// Used by SM to get all the prepaid CallDescriptors attached to a session
|
||||
func (rs *Responder) GetSessionRuns(ev *CDR, sRuns *[]*SessionRun) error {
|
||||
if rs.Bal != nil {
|
||||
return errors.New("Unsupported method on the balancer")
|
||||
}
|
||||
cacheKey := utils.GET_SESS_RUNS_CACHE_PREFIX + ev.CGRID
|
||||
if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil {
|
||||
if item.Value != nil {
|
||||
@@ -518,9 +476,6 @@ func (rs *Responder) GetSessionRuns(ev *CDR, sRuns *[]*SessionRun) error {
|
||||
}
|
||||
|
||||
func (rs *Responder) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs *utils.DerivedChargers) error {
|
||||
if rs.Bal != nil {
|
||||
return errors.New("BALANCER_UNSUPPORTED_METHOD")
|
||||
}
|
||||
if dcsH, err := HandleGetDerivedChargers(dataStorage, attrs); err != nil {
|
||||
return err
|
||||
} else if dcsH != nil {
|
||||
@@ -584,9 +539,6 @@ func (rs *Responder) Status(arg string, reply *map[string]interface{}) (err erro
|
||||
runtime.ReadMemStats(memstats)
|
||||
response := make(map[string]interface{})
|
||||
response[utils.InstanceID] = config.CgrConfig().InstanceID
|
||||
if rs.Bal != nil {
|
||||
response["Raters"] = rs.Bal.GetClientAddresses()
|
||||
}
|
||||
response["MemoryUsage"] = utils.SizeFmt(float64(memstats.HeapAlloc), "")
|
||||
response[utils.ActiveGoroutines] = runtime.NumGoroutine()
|
||||
response["Footprint"] = utils.SizeFmt(float64(memstats.Sys), "")
|
||||
@@ -595,9 +547,6 @@ func (rs *Responder) Status(arg string, reply *map[string]interface{}) (err erro
|
||||
}
|
||||
|
||||
func (rs *Responder) Shutdown(arg string, reply *string) (err error) {
|
||||
if rs.Bal != nil {
|
||||
rs.Bal.Shutdown("Responder.Shutdown")
|
||||
}
|
||||
dataStorage.Close()
|
||||
cdrStorage.Close()
|
||||
defer func() { rs.ExitChan <- true }()
|
||||
@@ -605,83 +554,6 @@ func (rs *Responder) Shutdown(arg string, reply *string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
/*
|
||||
The function that gets the information from the raters using balancer.
|
||||
*/
|
||||
func (rs *Responder) getCallCost(key *CallDescriptor, method string) (reply *CallCost, err error) {
|
||||
err = errors.New("") //not nil value
|
||||
for err != nil {
|
||||
client := rs.Bal.Balance()
|
||||
if client == nil {
|
||||
utils.Logger.Info("<Balancer> Waiting for raters to register...")
|
||||
time.Sleep(1 * time.Second) // wait one second and retry
|
||||
} else {
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
err = client.Call(method, *key, reply)
|
||||
return reply, err
|
||||
}, 0, key.GetAccountKey())
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<Balancer> Got en error from rater: %v", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
/*
|
||||
The function that gets the information from the raters using balancer.
|
||||
*/
|
||||
func (rs *Responder) callMethod(key *CallDescriptor, method string) (reply float64, err error) {
|
||||
err = errors.New("") //not nil value
|
||||
for err != nil {
|
||||
client := rs.Bal.Balance()
|
||||
if client == nil {
|
||||
utils.Logger.Info("Waiting for raters to register...")
|
||||
time.Sleep(1 * time.Second) // wait one second and retry
|
||||
} else {
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
err = client.Call(method, *key, &reply)
|
||||
return reply, err
|
||||
}, 0, key.GetAccountKey())
|
||||
if err != nil {
|
||||
utils.Logger.Info(fmt.Sprintf("Got en error from rater: %v", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
/*
|
||||
RPC method that receives a rater address, connects to it and ads the pair to the rater list for balancing
|
||||
*/
|
||||
func (rs *Responder) RegisterRater(clientAddress string, replay *int) error {
|
||||
utils.Logger.Info(fmt.Sprintf("Started rater %v registration...", clientAddress))
|
||||
time.Sleep(2 * time.Second) // wait a second for Rater to start serving
|
||||
client, err := rpc.Dial("tcp", clientAddress)
|
||||
if err != nil {
|
||||
utils.Logger.Err("Could not connect to client!")
|
||||
return err
|
||||
}
|
||||
rs.Bal.AddClient(clientAddress, client)
|
||||
utils.Logger.Info(fmt.Sprintf("Rater %v registered succesfully.", clientAddress))
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
RPC method that recives a rater addres gets the connections and closes it and removes the pair from rater list.
|
||||
*/
|
||||
func (rs *Responder) UnRegisterRater(clientAddress string, replay *int) error {
|
||||
client, ok := rs.Bal.GetClient(clientAddress)
|
||||
if ok {
|
||||
client.Close()
|
||||
rs.Bal.RemoveClient(clientAddress)
|
||||
utils.Logger.Info(fmt.Sprintf("Rater %v unregistered succesfully.", clientAddress))
|
||||
} else {
|
||||
utils.Logger.Info(fmt.Sprintf("Server %v was not on my watch!", clientAddress))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rs *Responder) GetTimeout(i int, d *time.Duration) error {
|
||||
*d = rs.Timeout
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user