diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index c8592fa0f..c5c2dc815 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -23,6 +23,7 @@ import ( "fmt" "log" "os" + "reflect" "runtime" "runtime/pprof" "strconv" @@ -109,16 +110,18 @@ func startCdrc(cdrsChan chan struct{}, cdrcCfgs map[string]*config.CdrcConfig, h } func startSmFreeSWITCH(responder *engine.Responder, cdrDb engine.CdrStorage, cacheChan chan struct{}) { - var raterConn, cdrsConn engine.Connector + var raterConn, cdrsConn engine.ConnectorPool var client *rpcclient.RpcClient + var err error delay := utils.Fib() - if cfg.SmFsConfig.Rater == utils.INTERNAL { - <-cacheChan // Wait for the cache to init before start doing queries - raterConn = responder - } else { - var err error + for _, raterCfg := range cfg.SmFsConfig.HaRater { + if raterCfg.Server == utils.INTERNAL { + <-cacheChan // Wait for the cache to init before start doing queries + raterConn = append(raterConn, responder) + continue + } for i := 0; i < cfg.SmFsConfig.Reconnects; i++ { - client, err = rpcclient.NewRpcClient("tcp", cfg.SmFsConfig.Rater, cfg.ConnectAttempts, cfg.SmFsConfig.Reconnects, utils.GOB) + client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.SmFsConfig.Reconnects, utils.GOB) if err == nil { //Connected so no need to reiterate break } @@ -129,32 +132,34 @@ func startSmFreeSWITCH(responder *engine.Responder, cdrDb engine.CdrStorage, cac exitChan <- true return } - raterConn = &engine.RPCClientConnector{Client: client} + raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout}) } - if cfg.SmFsConfig.Cdrs == cfg.SmFsConfig.Rater { + if reflect.DeepEqual(cfg.SmFsConfig.HaCdrs, cfg.SmFsConfig.HaRater) { cdrsConn = raterConn - } else if cfg.SmFsConfig.Cdrs == utils.INTERNAL { - <-cacheChan // Wait for the cache to init before start doing queries - cdrsConn = responder - } else if len(cfg.SmFsConfig.Cdrs) != 0 { + } else if len(cfg.SmFsConfig.HaCdrs) != 0 { delay = utils.Fib() - for i := 0; i < cfg.SmFsConfig.Reconnects; i++ { - client, err = rpcclient.NewRpcClient("tcp", cfg.SmFsConfig.Cdrs, cfg.ConnectAttempts, cfg.SmFsConfig.Reconnects, utils.GOB) - if err == nil { //Connected so no need to reiterate - break + for _, cdrsCfg := range cfg.SmFsConfig.HaCdrs { + if cdrsCfg.Server == utils.INTERNAL { + <-cacheChan // Wait for the cache to init before start doing queries + cdrsConn = append(cdrsConn, responder) + continue } - time.Sleep(delay()) + for i := 0; i < cfg.SmFsConfig.Reconnects; i++ { + client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.SmFsConfig.Reconnects, utils.GOB) + if err == nil { //Connected so no need to reiterate + break + } + time.Sleep(delay()) + } + if err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) + exitChan <- true + return + } + cdrsConn = append(cdrsConn, &engine.RPCClientConnector{Client: client, Timeout: cdrsCfg.Timeout}) } - if err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) - exitChan <- true - return - } - cdrsConn = &engine.RPCClientConnector{Client: client} } - rcp := engine.ConnectorPool{raterConn} - ccp := engine.ConnectorPool{cdrsConn} - sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, rcp, ccp) + sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, raterConn, cdrsConn) sms = append(sms, sm) smRpc.SMs = append(smRpc.SMs, sm) if err = sm.Connect(); err != nil { @@ -164,16 +169,19 @@ func startSmFreeSWITCH(responder *engine.Responder, cdrDb engine.CdrStorage, cac } func startSmKamailio(responder *engine.Responder, cdrDb engine.CdrStorage, cacheChan chan struct{}) { - var raterConn, cdrsConn engine.Connector + var raterConn, cdrsConn engine.ConnectorPool var client *rpcclient.RpcClient - if cfg.SmKamConfig.Rater == utils.INTERNAL { - <-cacheChan // Wait for the cache to init before start doing queries - raterConn = responder - } else { - var err error - delay := utils.Fib() + + var err error + delay := utils.Fib() + for _, raterCfg := range cfg.SmKamConfig.HaRater { + if raterCfg.Server == utils.INTERNAL { + <-cacheChan // Wait for the cache to init before start doing queries + raterConn = append(raterConn, responder) + continue + } for i := 0; i < cfg.SmKamConfig.Reconnects; i++ { - client, err = rpcclient.NewRpcClient("tcp", cfg.SmKamConfig.Rater, cfg.ConnectAttempts, cfg.SmKamConfig.Reconnects, utils.GOB) + client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.SmKamConfig.Reconnects, utils.GOB) if err == nil { //Connected so no need to reiterate break } @@ -183,28 +191,32 @@ func startSmKamailio(responder *engine.Responder, cdrDb engine.CdrStorage, cache engine.Logger.Crit(fmt.Sprintf(" Could not connect to rater: %v", err)) exitChan <- true } - raterConn = &engine.RPCClientConnector{Client: client} - } - if cfg.SmKamConfig.Cdrs == cfg.SmKamConfig.Rater { - cdrsConn = raterConn - } else if cfg.SmKamConfig.Cdrs == utils.INTERNAL { - <-cacheChan // Wait for the cache to init before start doing queries - cdrsConn = responder - } else if len(cfg.SmKamConfig.Cdrs) != 0 { - delay := utils.Fib() - for i := 0; i < cfg.SmKamConfig.Reconnects; i++ { - client, err = rpcclient.NewRpcClient("tcp", cfg.SmKamConfig.Cdrs, cfg.ConnectAttempts, cfg.SmKamConfig.Reconnects, utils.GOB) - if err == nil { //Connected so no need to reiterate - break + raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout}) + if reflect.DeepEqual(cfg.SmKamConfig.HaCdrs, cfg.SmKamConfig.HaRater) { + cdrsConn = raterConn + } else if len(cfg.SmKamConfig.HaCdrs) != 0 { + delay := utils.Fib() + for _, cdrsCfg := range cfg.SmKamConfig.HaCdrs { + if cdrsCfg.Server == utils.INTERNAL { + <-cacheChan // Wait for the cache to init before start doing queries + cdrsConn = append(cdrsConn, responder) + continue + } + for i := 0; i < cfg.SmKamConfig.Reconnects; i++ { + client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.SmKamConfig.Reconnects, utils.GOB) + if err == nil { //Connected so no need to reiterate + break + } + time.Sleep(delay()) + } + if err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) + exitChan <- true + return + } + cdrsConn = append(cdrsConn, &engine.RPCClientConnector{Client: client, Timeout: cdrsCfg.Timeout}) } - time.Sleep(delay()) } - if err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) - exitChan <- true - return - } - cdrsConn = &engine.RPCClientConnector{Client: client} } sm, _ := sessionmanager.NewKamailioSessionManager(cfg.SmKamConfig, raterConn, cdrsConn) sms = append(sms, sm) @@ -216,16 +228,19 @@ func startSmKamailio(responder *engine.Responder, cdrDb engine.CdrStorage, cache } func startSmOpenSIPS(responder *engine.Responder, cdrDb engine.CdrStorage, cacheChan chan struct{}) { - var raterConn, cdrsConn engine.Connector + var raterConn, cdrsConn engine.ConnectorPool var client *rpcclient.RpcClient - if cfg.SmOsipsConfig.Rater == utils.INTERNAL { - <-cacheChan // Wait for the cache to init before start doing queries - raterConn = responder - } else { - var err error - delay := utils.Fib() + + var err error + delay := utils.Fib() + for _, raterCfg := range cfg.SmOsipsConfig.HaRater { + if raterCfg.Server == utils.INTERNAL { + <-cacheChan // Wait for the cache to init before start doing queries + raterConn = append(raterConn, responder) + continue + } for i := 0; i < cfg.SmOsipsConfig.Reconnects; i++ { - client, err = rpcclient.NewRpcClient("tcp", cfg.SmOsipsConfig.Rater, cfg.ConnectAttempts, cfg.SmOsipsConfig.Reconnects, utils.GOB) + client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.SmOsipsConfig.Reconnects, utils.GOB) if err == nil { //Connected so no need to reiterate break } @@ -235,28 +250,33 @@ func startSmOpenSIPS(responder *engine.Responder, cdrDb engine.CdrStorage, cache engine.Logger.Crit(fmt.Sprintf(" Could not connect to rater: %v", err)) exitChan <- true } - raterConn = &engine.RPCClientConnector{Client: client} + raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout}) } - if cfg.SmOsipsConfig.Cdrs == cfg.SmOsipsConfig.Rater { + if reflect.DeepEqual(cfg.SmOsipsConfig.HaCdrs, cfg.SmOsipsConfig.HaRater) { cdrsConn = raterConn - } else if cfg.SmOsipsConfig.Cdrs == utils.INTERNAL { - <-cacheChan // Wait for the cache to init before start doing queries - cdrsConn = responder - } else if len(cfg.SmOsipsConfig.Cdrs) != 0 { - delay := utils.Fib() - for i := 0; i < cfg.SmOsipsConfig.Reconnects; i++ { - client, err = rpcclient.NewRpcClient("tcp", cfg.SmOsipsConfig.Cdrs, cfg.ConnectAttempts, cfg.SmOsipsConfig.Reconnects, utils.GOB) - if err == nil { //Connected so no need to reiterate - break + } + for _, cdrsCfg := range cfg.SmOsipsConfig.HaCdrs { + if cdrsCfg.Server == utils.INTERNAL { + <-cacheChan // Wait for the cache to init before start doing queries + cdrsConn = append(cdrsConn, responder) + continue + } + if len(cfg.SmOsipsConfig.HaCdrs) != 0 { + delay := utils.Fib() + for i := 0; i < cfg.SmOsipsConfig.Reconnects; i++ { + client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.SmOsipsConfig.Reconnects, utils.GOB) + if err == nil { //Connected so no need to reiterate + break + } + time.Sleep(delay()) } - time.Sleep(delay()) + if err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) + exitChan <- true + return + } + cdrsConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: cdrsCfg.Timeout}) } - if err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) - exitChan <- true - return - } - cdrsConn = &engine.RPCClientConnector{Client: client} } sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, raterConn, cdrsConn) sms = append(sms, sm) @@ -547,7 +567,7 @@ func main() { }() wg.Wait() - responder := &engine.Responder{ExitChan: exitChan, Stats: cdrStats} + responder := &engine.Responder{ExitChan: exitChan, Stats: cdrStats, Timeout: 10 * time.Minute} apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats} apierRpcV2 := &v2.ApierV2{ApierV1: v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats}} diff --git a/config/config.go b/config/config.go index 69bfd0d0b..3c382c2c4 100644 --- a/config/config.go +++ b/config/config.go @@ -45,6 +45,7 @@ var ( cgrCfg *CGRConfig // will be shared dfltFsConnConfig *FsConnConfig // Default FreeSWITCH Connection configuration, built out of json default configuration dfltKamConnConfig *KamConnConfig // Default Kamailio Connection configuration + dfltHaPoolConfig *HaPoolConfig ) // Used to retrieve system configuration from other packages @@ -289,46 +290,46 @@ func (self *CGRConfig) checkConfigSanity() error { } // SM-FreeSWITCH checks if self.SmFsConfig.Enabled { - if self.SmFsConfig.Rater == "" { + if len(self.SmFsConfig.HaRater) == 0 { return errors.New("Rater definition is mandatory!") } - if self.SmFsConfig.Cdrs == "" { + if len(self.SmFsConfig.HaCdrs) == 0 { return errors.New("Cdrs definition is mandatory!") } - if self.SmFsConfig.Rater == utils.INTERNAL && !self.RaterEnabled { + if self.SmFsConfig.HaRater[0].Server == utils.INTERNAL && !self.RaterEnabled { return errors.New("Rater not enabled but requested by SM-FreeSWITCH component.") } - if self.SmFsConfig.Cdrs == utils.INTERNAL && !self.CDRSEnabled { + if self.SmFsConfig.HaCdrs[0].Server == utils.INTERNAL && !self.CDRSEnabled { return errors.New("CDRS not enabled but referenced by SM-FreeSWITCH component") } } // SM-Kamailio checks if self.SmKamConfig.Enabled { - if self.SmKamConfig.Rater == "" { + if len(self.SmKamConfig.HaRater) == 0 { return errors.New("Rater definition is mandatory!") } - if self.SmKamConfig.Cdrs == "" { + if len(self.SmKamConfig.HaCdrs) == 0 { return errors.New("Cdrs definition is mandatory!") } - if self.SmKamConfig.Rater == utils.INTERNAL && !self.RaterEnabled { + if self.SmKamConfig.HaRater[0].Server == utils.INTERNAL && !self.RaterEnabled { return errors.New("Rater not enabled but requested by SM-Kamailio component.") } - if self.SmKamConfig.Cdrs == utils.INTERNAL && !self.CDRSEnabled { + if self.SmKamConfig.HaCdrs[0].Server == utils.INTERNAL && !self.CDRSEnabled { return errors.New("CDRS not enabled but referenced by SM-Kamailio component") } } // SM-OpenSIPS checks if self.SmOsipsConfig.Enabled { - if self.SmOsipsConfig.Rater == "" { + if len(self.SmOsipsConfig.HaRater) == 0 { return errors.New("Rater definition is mandatory!") } - if self.SmOsipsConfig.Cdrs == "" { + if len(self.SmOsipsConfig.HaCdrs) == 0 { return errors.New("Cdrs definition is mandatory!") } - if self.SmOsipsConfig.Rater == utils.INTERNAL && !self.RaterEnabled { + if self.SmOsipsConfig.HaRater[0].Server == utils.INTERNAL && !self.RaterEnabled { return errors.New("Rater not enabled but requested by SM-OpenSIPS component.") } - if self.SmOsipsConfig.Cdrs == utils.INTERNAL && !self.CDRSEnabled { + if self.SmOsipsConfig.HaCdrs[0].Server == utils.INTERNAL && !self.CDRSEnabled { return errors.New("CDRS not enabled but referenced by SM-OpenSIPS component") } } diff --git a/config/config_defaults.go b/config/config_defaults.go index 01b8a4c61..babc6765f 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -188,8 +188,12 @@ const CGRATES_CFG_JSON = ` "sm_freeswitch": { "enabled": false, // starts SessionManager service: - "rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013> - "cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234> + "ha_rater": [ + {"server": "internal", "timeout": "100ms", "time_to_live": "3s"} + ], + "ha_cdrs": [ + {"server": "internal", "timeout": "100ms", "time_to_live": "3s"} + ], "reconnects": 5, // number of reconnect attempts to rater or cdrs "create_cdr": false, // create CDR out of events and sends them to CDRS component "cdr_extra_fields": [], // extra fields to store in CDRs when creating them @@ -210,8 +214,12 @@ const CGRATES_CFG_JSON = ` "sm_kamailio": { "enabled": false, // starts SessionManager service: - "rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013> - "cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234> + "ha_rater": [ + {"server": "internal", "timeout": "100ms", "time_to_live": "3s"} + ], + "ha_cdrs": [ + {"server": "internal", "timeout": "100ms", "time_to_live": "3s"} + ], "reconnects": 5, // number of reconnect attempts to rater or cdrs "create_cdr": false, // create CDR out of events and sends them to CDRS component "debit_interval": "10s", // interval to perform debits on. @@ -226,8 +234,12 @@ const CGRATES_CFG_JSON = ` "sm_opensips": { "enabled": false, // starts SessionManager service: "listen_udp": "127.0.0.1:2020", // address where to listen for datagram events coming from OpenSIPS - "rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013> - "cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234> + "ha_rater": [ + {"server": "internal", "timeout": "100ms", "time_to_live": "3s"} + ], + "ha_cdrs": [ + {"server": "internal", "timeout": "100ms", "time_to_live": "3s"} + ], "reconnects": 5, // number of reconnects if connection is lost "create_cdr": false, // create CDR out of events and sends them to CDRS component "debit_interval": "10s", // interval to perform debits on. diff --git a/config/config_json_test.go b/config/config_json_test.go index 696965792..4a0658c0f 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -312,9 +312,19 @@ func TestDfCdrcJsonCfg(t *testing.T) { func TestSmFsJsonCfg(t *testing.T) { eCfg := &SmFsJsonCfg{ - Enabled: utils.BoolPointer(false), - Rater: utils.StringPointer("internal"), - Cdrs: utils.StringPointer("internal"), + Enabled: utils.BoolPointer(false), + Ha_rater: &[]*HaPoolJsonCfg{ + &HaPoolJsonCfg{ + Server: utils.StringPointer("internal"), + Timeout: utils.StringPointer("100ms"), + Time_to_live: utils.StringPointer("3s"), + }}, + Ha_cdrs: &[]*HaPoolJsonCfg{ + &HaPoolJsonCfg{ + Server: utils.StringPointer("internal"), + Timeout: utils.StringPointer("100ms"), + Time_to_live: utils.StringPointer("3s"), + }}, Reconnects: utils.IntPointer(5), Create_cdr: utils.BoolPointer(false), Cdr_extra_fields: utils.StringSlicePointer([]string{}), @@ -343,9 +353,19 @@ func TestSmFsJsonCfg(t *testing.T) { func TestSmKamJsonCfg(t *testing.T) { eCfg := &SmKamJsonCfg{ - Enabled: utils.BoolPointer(false), - Rater: utils.StringPointer("internal"), - Cdrs: utils.StringPointer("internal"), + Enabled: utils.BoolPointer(false), + Ha_rater: &[]*HaPoolJsonCfg{ + &HaPoolJsonCfg{ + Server: utils.StringPointer("internal"), + Timeout: utils.StringPointer("100ms"), + Time_to_live: utils.StringPointer("3s"), + }}, + Ha_cdrs: &[]*HaPoolJsonCfg{ + &HaPoolJsonCfg{ + Server: utils.StringPointer("internal"), + Timeout: utils.StringPointer("100ms"), + Time_to_live: utils.StringPointer("3s"), + }}, Reconnects: utils.IntPointer(5), Create_cdr: utils.BoolPointer(false), Debit_interval: utils.StringPointer("10s"), @@ -367,10 +387,20 @@ func TestSmKamJsonCfg(t *testing.T) { func TestSmOsipsJsonCfg(t *testing.T) { eCfg := &SmOsipsJsonCfg{ - Enabled: utils.BoolPointer(false), - Listen_udp: utils.StringPointer("127.0.0.1:2020"), - Rater: utils.StringPointer("internal"), - Cdrs: utils.StringPointer("internal"), + Enabled: utils.BoolPointer(false), + Listen_udp: utils.StringPointer("127.0.0.1:2020"), + Ha_rater: &[]*HaPoolJsonCfg{ + &HaPoolJsonCfg{ + Server: utils.StringPointer("internal"), + Timeout: utils.StringPointer("100ms"), + Time_to_live: utils.StringPointer("3s"), + }}, + Ha_cdrs: &[]*HaPoolJsonCfg{ + &HaPoolJsonCfg{ + Server: utils.StringPointer("internal"), + Timeout: utils.StringPointer("100ms"), + Time_to_live: utils.StringPointer("3s"), + }}, Reconnects: utils.IntPointer(5), Create_cdr: utils.BoolPointer(false), Debit_interval: utils.StringPointer("10s"), diff --git a/config/libconfig_json.go b/config/libconfig_json.go index c1c6b8a37..287a455c2 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -145,8 +145,8 @@ type CdrcJsonCfg struct { // SM-FreeSWITCH config section type SmFsJsonCfg struct { Enabled *bool - Rater *string - Cdrs *string + Ha_rater *[]*HaPoolJsonCfg + Ha_cdrs *[]*HaPoolJsonCfg Reconnects *int Create_cdr *bool Cdr_extra_fields *[]string @@ -162,6 +162,13 @@ type SmFsJsonCfg struct { Connections *[]*FsConnJsonCfg } +// Represents one connection instance towards a rater/cdrs server +type HaPoolJsonCfg struct { + Server *string + Timeout *string + Time_to_live *string +} + // Represents one connection instance towards FreeSWITCH type FsConnJsonCfg struct { Server *string @@ -172,8 +179,8 @@ type FsConnJsonCfg struct { // SM-Kamailio config section type SmKamJsonCfg struct { Enabled *bool - Rater *string - Cdrs *string + Ha_rater *[]*HaPoolJsonCfg + Ha_cdrs *[]*HaPoolJsonCfg Reconnects *int Create_cdr *bool Debit_interval *string @@ -192,8 +199,8 @@ type KamConnJsonCfg struct { type SmOsipsJsonCfg struct { Enabled *bool Listen_udp *string - Rater *string - Cdrs *string + Ha_rater *[]*HaPoolJsonCfg + Ha_cdrs *[]*HaPoolJsonCfg Reconnects *int Create_cdr *bool Debit_interval *string diff --git a/config/smconfig.go b/config/smconfig.go index 0075783bd..6d4644988 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -19,10 +19,48 @@ along with this program. If not, see package config import ( - "github.com/cgrates/cgrates/utils" "time" + + "github.com/cgrates/cgrates/utils" ) +// Returns the first cached default value for a SM-FreeSWITCH connection +func NewDfltHaPoolConfig() *HaPoolConfig { + if dfltHaPoolConfig == nil { + return new(HaPoolConfig) // No defaults, most probably we are building the defaults now + } + dfltVal := *dfltHaPoolConfig // Copy the value instead of it's pointer + return &dfltVal +} + +// One connection to FreeSWITCH server +type HaPoolConfig struct { + Server string + Timeout time.Duration + TimeToLive time.Duration +} + +func (self *HaPoolConfig) loadFromJsonCfg(jsnCfg *HaPoolJsonCfg) error { + var err error + if jsnCfg == nil { + return nil + } + if jsnCfg.Server != nil { + self.Server = *jsnCfg.Server + } + if jsnCfg.Timeout != nil { + if self.Timeout, err = utils.ParseDurationWithSecs(*jsnCfg.Timeout); err != nil { + return err + } + } + if jsnCfg.Time_to_live != nil { + if self.TimeToLive, err = utils.ParseDurationWithSecs(*jsnCfg.Time_to_live); err != nil { + return err + } + } + return nil +} + // Returns the first cached default value for a SM-FreeSWITCH connection func NewDfltFsConnConfig() *FsConnConfig { if dfltFsConnConfig == nil { @@ -57,8 +95,8 @@ func (self *FsConnConfig) loadFromJsonCfg(jsnCfg *FsConnJsonCfg) error { type SmFsConfig struct { Enabled bool - Rater string - Cdrs string + HaRater []*HaPoolConfig + HaCdrs []*HaPoolConfig Reconnects int CreateCdr bool CdrExtraFields []*utils.RSRField @@ -82,11 +120,19 @@ func (self *SmFsConfig) loadFromJsonCfg(jsnCfg *SmFsJsonCfg) error { if jsnCfg.Enabled != nil { self.Enabled = *jsnCfg.Enabled } - if jsnCfg.Rater != nil { - self.Rater = *jsnCfg.Rater + if jsnCfg.Ha_rater != nil { + self.HaRater = make([]*HaPoolConfig, len(*jsnCfg.Ha_rater)) + for idx, jsnHaCfg := range *jsnCfg.Ha_rater { + self.HaRater[idx] = NewDfltHaPoolConfig() + self.HaRater[idx].loadFromJsonCfg(jsnHaCfg) + } } - if jsnCfg.Cdrs != nil { - self.Cdrs = *jsnCfg.Cdrs + if jsnCfg.Ha_cdrs != nil { + self.HaCdrs = make([]*HaPoolConfig, len(*jsnCfg.Ha_cdrs)) + for idx, jsnHaCfg := range *jsnCfg.Ha_cdrs { + self.HaCdrs[idx] = NewDfltHaPoolConfig() + self.HaCdrs[idx].loadFromJsonCfg(jsnHaCfg) + } } if jsnCfg.Reconnects != nil { self.Reconnects = *jsnCfg.Reconnects @@ -177,8 +223,8 @@ func (self *KamConnConfig) loadFromJsonCfg(jsnCfg *KamConnJsonCfg) error { // SM-Kamailio config section type SmKamConfig struct { Enabled bool - Rater string - Cdrs string + HaRater []*HaPoolConfig + HaCdrs []*HaPoolConfig Reconnects int CreateCdr bool DebitInterval time.Duration @@ -195,11 +241,19 @@ func (self *SmKamConfig) loadFromJsonCfg(jsnCfg *SmKamJsonCfg) error { if jsnCfg.Enabled != nil { self.Enabled = *jsnCfg.Enabled } - if jsnCfg.Rater != nil { - self.Rater = *jsnCfg.Rater + if jsnCfg.Ha_rater != nil { + self.HaRater = make([]*HaPoolConfig, len(*jsnCfg.Ha_rater)) + for idx, jsnHaCfg := range *jsnCfg.Ha_rater { + self.HaRater[idx] = NewDfltHaPoolConfig() + self.HaRater[idx].loadFromJsonCfg(jsnHaCfg) + } } - if jsnCfg.Cdrs != nil { - self.Cdrs = *jsnCfg.Cdrs + if jsnCfg.Ha_cdrs != nil { + self.HaCdrs = make([]*HaPoolConfig, len(*jsnCfg.Ha_cdrs)) + for idx, jsnHaCfg := range *jsnCfg.Ha_cdrs { + self.HaCdrs[idx] = NewDfltHaPoolConfig() + self.HaCdrs[idx].loadFromJsonCfg(jsnHaCfg) + } } if jsnCfg.Reconnects != nil { self.Reconnects = *jsnCfg.Reconnects @@ -252,8 +306,8 @@ func (self *OsipsConnConfig) loadFromJsonCfg(jsnCfg *OsipsConnJsonCfg) error { type SmOsipsConfig struct { Enabled bool ListenUdp string - Rater string - Cdrs string + HaRater []*HaPoolConfig + HaCdrs []*HaPoolConfig Reconnects int CreateCdr bool DebitInterval time.Duration @@ -271,11 +325,19 @@ func (self *SmOsipsConfig) loadFromJsonCfg(jsnCfg *SmOsipsJsonCfg) error { if jsnCfg.Listen_udp != nil { self.ListenUdp = *jsnCfg.Listen_udp } - if jsnCfg.Rater != nil { - self.Rater = *jsnCfg.Rater + if jsnCfg.Ha_rater != nil { + self.HaRater = make([]*HaPoolConfig, len(*jsnCfg.Ha_rater)) + for idx, jsnHaCfg := range *jsnCfg.Ha_rater { + self.HaRater[idx] = NewDfltHaPoolConfig() + self.HaRater[idx].loadFromJsonCfg(jsnHaCfg) + } } - if jsnCfg.Cdrs != nil { - self.Cdrs = *jsnCfg.Cdrs + if jsnCfg.Ha_cdrs != nil { + self.HaCdrs = make([]*HaPoolConfig, len(*jsnCfg.Ha_cdrs)) + for idx, jsnHaCfg := range *jsnCfg.Ha_cdrs { + self.HaCdrs[idx] = NewDfltHaPoolConfig() + self.HaCdrs[idx].loadFromJsonCfg(jsnHaCfg) + } } if jsnCfg.Reconnects != nil { self.Reconnects = *jsnCfg.Reconnects diff --git a/engine/responder.go b/engine/responder.go index 1d1edddb9..0e861c2b6 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -42,7 +42,6 @@ type SessionRun struct { } var ( - timeout = 100 * time.Millisecond timeToLive = 5 * time.Second responseCache = cache2go.NewResponseCache(timeToLive) ) @@ -52,6 +51,7 @@ type Responder struct { ExitChan chan bool CdrSrv *CdrServer Stats StatsInterface + Timeout time.Duration } /* @@ -455,6 +455,10 @@ func (rs *Responder) UnRegisterRater(clientAddress string, replay *int) error { return nil } +func (rs *Responder) GetTimeout() time.Duration { + return rs.Timeout +} + // Reflection worker type for not standalone balancer type ResponderWorker struct{} @@ -503,10 +507,12 @@ type Connector interface { ProcessCdr(*StoredCdr, *string) error LogCallCost(*CallCostLog, *string) error GetLCR(*CallDescriptor, *LCRCost) error + GetTimeout() time.Duration } type RPCClientConnector struct { - Client *rpcclient.RpcClient + Client *rpcclient.RpcClient + Timeout time.Duration } func (rcc *RPCClientConnector) GetCost(cd *CallDescriptor, cc *CallCost) error { @@ -553,6 +559,10 @@ func (rcc *RPCClientConnector) GetLCR(cd *CallDescriptor, reply *LCRCost) error return rcc.Client.Call("Responder.GetLCR", cd, reply) } +func (rcc *RPCClientConnector) GetTimeout() time.Duration { + return rcc.Timeout +} + type ConnectorPool []Connector func (cp ConnectorPool) GetCost(cd *CallDescriptor, cc *CallCost) error { @@ -564,7 +574,7 @@ func (cp ConnectorPool) GetCost(cd *CallDescriptor, cc *CallCost) error { case err := <-c: *cc = *callCost return err - case <-time.After(timeout): + case <-time.After(con.GetTimeout()): // call timed out, continue } } @@ -580,7 +590,7 @@ func (cp ConnectorPool) Debit(cd *CallDescriptor, cc *CallCost) error { case err := <-c: *cc = *callCost return err - case <-time.After(timeout): + case <-time.After(con.GetTimeout()): // call timed out, continue } } @@ -596,7 +606,7 @@ func (cp ConnectorPool) MaxDebit(cd *CallDescriptor, cc *CallCost) error { case err := <-c: *cc = *callCost return err - case <-time.After(timeout): + case <-time.After(con.GetTimeout()): // call timed out, continue } } @@ -612,7 +622,7 @@ func (cp ConnectorPool) RefundIncrements(cd *CallDescriptor, resp *float64) erro case err := <-c: *resp = r return err - case <-time.After(timeout): + case <-time.After(con.GetTimeout()): // call timed out, continue } } @@ -628,7 +638,7 @@ func (cp ConnectorPool) GetMaxSessionTime(cd *CallDescriptor, resp *float64) err case err := <-c: *resp = r return err - case <-time.After(timeout): + case <-time.After(con.GetTimeout()): // call timed out, continue } } @@ -644,7 +654,7 @@ func (cp ConnectorPool) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) case err := <-c: *reply = r return err - case <-time.After(timeout): + case <-time.After(con.GetTimeout()): // call timed out, continue } } @@ -660,7 +670,7 @@ func (cp ConnectorPool) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) erro case err := <-c: *sRuns = sr return err - case <-time.After(timeout): + case <-time.After(con.GetTimeout()): // call timed out, continue } } @@ -676,7 +686,7 @@ func (cp ConnectorPool) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs case err := <-c: *dcs = derivedChargers return err - case <-time.After(timeout): + case <-time.After(con.GetTimeout()): // call timed out, continue } } @@ -692,7 +702,7 @@ func (cp ConnectorPool) ProcessCdr(cdr *StoredCdr, reply *string) error { case err := <-c: *reply = r return err - case <-time.After(timeout): + case <-time.After(con.GetTimeout()): // call timed out, continue } } @@ -708,7 +718,7 @@ func (cp ConnectorPool) LogCallCost(ccl *CallCostLog, reply *string) error { case err := <-c: *reply = r return err - case <-time.After(timeout): + case <-time.After(con.GetTimeout()): // call timed out, continue } } @@ -724,9 +734,13 @@ func (cp ConnectorPool) GetLCR(cd *CallDescriptor, reply *LCRCost) error { case err := <-c: *reply = *lcrCost return err - case <-time.After(timeout): + case <-time.After(con.GetTimeout()): // call timed out, continue } } return utils.ErrTimedOut } + +func (cp ConnectorPool) GetTimeout() time.Duration { + return 0 +}