Multiple SessionManagers started out out of the same engine, configuration refactoring completed for SessionManagers

This commit is contained in:
DanB
2015-03-06 20:55:31 +01:00
parent a0d495647d
commit 96d4b6c28b
12 changed files with 252 additions and 452 deletions

View File

@@ -145,16 +145,16 @@ func startCdrc(cdrsChan chan struct{}, cdrcCfgs map[string]*config.CdrcConfig, h
exitChan <- true // If run stopped, something is bad, stop the application
}
func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage, cacheChan chan struct{}) {
func startSmFreeSWITCH(responder *engine.Responder, loggerDb engine.LogStorage, cacheChan chan struct{}) {
var raterConn, cdrsConn engine.Connector
var client *rpcclient.RpcClient
if cfg.SMRater == utils.INTERNAL {
if cfg.SmFsConfig.Rater == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
raterConn = responder
} else {
var err error
for i := 0; i < cfg.SMReconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SMRater, 0, cfg.SMReconnects, utils.GOB)
for i := 0; i < cfg.SmFsConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmFsConfig.Rater, 0, cfg.SmFsConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
@@ -166,14 +166,14 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
}
raterConn = &engine.RPCClientConnector{Client: client}
}
if cfg.SMCdrS == cfg.SMRater {
if cfg.SmFsConfig.Cdrs == cfg.SmFsConfig.Rater {
cdrsConn = raterConn
} else if cfg.SMCdrS == utils.INTERNAL {
} else if cfg.SmFsConfig.Cdrs == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
cdrsConn = responder
} else if len(cfg.SMCdrS) != 0 {
for i := 0; i < cfg.SMReconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SMCdrS, 0, cfg.SMReconnects, utils.GOB)
} else if len(cfg.SmFsConfig.Cdrs) != 0 {
for i := 0; i < cfg.SmFsConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmFsConfig.Cdrs, 0, cfg.SmFsConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
@@ -186,23 +186,109 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
switch cfg.SMSwitchType {
case FS:
sm = sessionmanager.NewFSSessionManager(cfg.SmFsConfig, loggerDb, raterConn, cdrsConn)
case KAMAILIO:
sm, _ = sessionmanager.NewKamailioSessionManager(cfg.SmKamConfig, raterConn, cdrsConn, loggerDb)
case OSIPS:
sm, _ = sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, raterConn, cdrsConn)
default:
engine.Logger.Err(fmt.Sprintf("<SessionManager> Unsupported session manger type: %s!", cfg.SMSwitchType))
exitChan <- true
}
sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, loggerDb, raterConn, cdrsConn)
if err = sm.Connect(); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> error: %s!", err))
}
exitChan <- true
}
func startSmKamailio(responder *engine.Responder, loggerDb engine.LogStorage, cacheChan chan struct{}) {
var raterConn, cdrsConn engine.Connector
var client *rpcclient.RpcClient
if cfg.SmKamConfig.Rater == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
raterConn = responder
} else {
var err error
for i := 0; i < cfg.SmKamConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmKamConfig.Rater, 0, cfg.SmKamConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i+1) * time.Second)
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SessionManager> Could not connect to engine: %v", err))
exitChan <- true
}
raterConn = &engine.RPCClientConnector{Client: client}
}
if cfg.SmKamConfig.Cdrs == cfg.SmKamConfig.Rater {
cdrsConn = raterConn
} else if cfg.SmKamConfig.Cdrs == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
cdrsConn = responder
} else if len(cfg.SmKamConfig.Cdrs) != 0 {
for i := 0; i < cfg.SmKamConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmKamConfig.Cdrs, 0, cfg.SmKamConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i+1) * time.Second)
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
sm, _ := sessionmanager.NewKamailioSessionManager(cfg.SmKamConfig, raterConn, cdrsConn, loggerDb)
if err = sm.Connect(); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> error: %s!", err))
}
exitChan <- true
}
func startSmOpenSIPS(responder *engine.Responder, loggerDb engine.LogStorage, cacheChan chan struct{}) {
var raterConn, cdrsConn engine.Connector
var client *rpcclient.RpcClient
if cfg.SmOsipsConfig.Rater == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
raterConn = responder
} else {
var err error
for i := 0; i < cfg.SmOsipsConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmOsipsConfig.Rater, 0, cfg.SmOsipsConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i+1) * time.Second)
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SessionManager> Could not connect to engine: %v", err))
exitChan <- true
}
raterConn = &engine.RPCClientConnector{Client: client}
}
if cfg.SmOsipsConfig.Cdrs == cfg.SmOsipsConfig.Rater {
cdrsConn = raterConn
} else if cfg.SmOsipsConfig.Cdrs == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
cdrsConn = responder
} else if len(cfg.SmOsipsConfig.Cdrs) != 0 {
for i := 0; i < cfg.SmOsipsConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmOsipsConfig.Cdrs, 0, cfg.SmOsipsConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i+1) * time.Second)
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, raterConn, cdrsConn)
if err = sm.Connect(); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> error: %s!", err))
}
exitChan <- true
}
func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage, mediChan, doneChan chan struct{}) {
if cfg.CDRSMediator == utils.INTERNAL {
<-mediChan // Deadlock if mediator not started
@@ -280,10 +366,12 @@ func serveHttp(httpWaitChans []chan struct{}) {
}
func checkConfigSanity() error {
if cfg.SMEnabled && cfg.RaterEnabled && cfg.RaterBalancer != "" {
engine.Logger.Crit("The session manager must not be enabled on a worker engine (change [engine]/balancer to disabled)!")
return errors.New("SessionManager on Worker")
}
/*
if cfg.SMEnabled && cfg.RaterEnabled && cfg.RaterBalancer != "" {
engine.Logger.Crit("The session manager must not be enabled on a worker engine (change [engine]/balancer to disabled)!")
return errors.New("SessionManager on Worker")
}
*/
if cfg.BalancerEnabled && cfg.RaterEnabled && cfg.RaterBalancer != "" {
engine.Logger.Crit("The balancer is enabled so it cannot connect to another balancer (change rater/balancer to disabled)!")
return errors.New("Improperly configured balancer")
@@ -389,12 +477,6 @@ func main() {
}
engine.SetRoundingDecimals(cfg.RoundingDecimals)
if cfg.SMDebitInterval > 0 {
if dp, err := time.ParseDuration(fmt.Sprintf("%vs", cfg.SMDebitInterval)); err == nil {
engine.SetDebitPeriod(dp)
}
}
stopHandled := false
// Async starts here
@@ -492,12 +574,20 @@ func main() {
go startCDRS(responder, cdrDb, medChan, cdrsChan)
}
if cfg.SMEnabled {
engine.Logger.Info("Starting CGRateS SessionManager service.")
go startSessionManager(responder, logDb, cacheChan)
if cfg.SmFsConfig.Enabled {
engine.Logger.Info("Starting CGRateS SM-FreeSWITCH service.")
go startSmFreeSWITCH(responder, logDb, cacheChan)
// close all sessions on shutdown
go shutdownSessionmanagerSingnalHandler()
}
if cfg.SmKamConfig.Enabled {
engine.Logger.Info("Starting CGRateS SM-Kamailio service.")
go startSmKamailio(responder, logDb, cacheChan)
}
if cfg.SmOsipsConfig.Enabled {
engine.Logger.Info("Starting CGRateS SM-OpenSIPS service.")
go startSmOpenSIPS(responder, logDb, cacheChan)
}
var cdrcEnabled bool
for _, cdrcCfgs := range cfg.CdrcProfiles {
var cdrcCfg *config.CdrcConfig

View File

@@ -42,10 +42,9 @@ const (
)
var (
cgrCfg *CGRConfig // will be shared
dfltFsConnConfig *FsConnConfig // Default FreeSWITCH Connection configuration, built out of json default configuration
dfltKamConnConfig *KamConnConfig // Default Kamailio Connection configuration
dfltOsipsConnConfig *OsipsConnConfig // Default OpenSIPS Connection configuration
cgrCfg *CGRConfig // will be shared
dfltFsConnConfig *FsConnConfig // Default FreeSWITCH Connection configuration, built out of json default configuration
dfltKamConnConfig *KamConnConfig // Default Kamailio Connection configuration
)
// Used to retrieve system configuration from other packages
@@ -70,6 +69,7 @@ func NewDefaultCGRConfig() (*CGRConfig, error) {
if err != nil {
return nil, err
}
cfg.MaxCallDuration = time.Duration(3) * time.Hour // Hardcoded for now
if err := cfg.loadFromJsonCfg(cgrJsonCfg); err != nil {
return nil, err
}
@@ -147,91 +147,70 @@ func NewCGRConfigFromFolder(cfgDir string) (*CGRConfig, error) {
// Holds system configuration, defaults are overwritten with values from config file if found
type CGRConfig struct {
RatingDBType string
RatingDBHost string // The host to connect to. Values that start with / are for UNIX domain sockets.
RatingDBPort string // The port to bind to.
RatingDBName string // The name of the database to connect to.
RatingDBUser string // The user to sign in as.
RatingDBPass string // The user's password.
AccountDBType string
AccountDBHost string // The host to connect to. Values that start with / are for UNIX domain sockets.
AccountDBPort string // The port to bind to.
AccountDBName string // The name of the database to connect to.
AccountDBUser string // The user to sign in as.
AccountDBPass string // The user's password.
StorDBType string // Should reflect the database type used to store logs
StorDBHost string // The host to connect to. Values that start with / are for UNIX domain sockets.
StorDBPort string // Th e port to bind to.
StorDBName string // The name of the database to connect to.
StorDBUser string // The user to sign in as.
StorDBPass string // The user's password.
StorDBMaxOpenConns int // Maximum database connections opened
StorDBMaxIdleConns int // Maximum idle connections to keep opened
DBDataEncoding string // The encoding used to store object data in strings: <msgpack|json>
RPCJSONListen string // RPC JSON listening address
RPCGOBListen string // RPC GOB listening address
HTTPListen string // HTTP listening address
DefaultReqType string // Use this request type if not defined on top
DefaultCategory string // set default type of record
DefaultTenant string // set default tenant
DefaultSubject string // set default rating subject, useful in case of fallback
RoundingDecimals int // Number of decimals to round end prices at
HttpSkipTlsVerify bool // If enabled Http Client will accept any TLS certificate
TpExportPath string // Path towards export folder for offline Tariff Plans
RaterEnabled bool // start standalone server (no balancer)
RaterBalancer string // balancer address host:port
BalancerEnabled bool
SchedulerEnabled bool
CDRSEnabled bool // Enable CDR Server service
CDRSExtraFields []*utils.RSRField // Extra fields to store in CDRs
CDRSMediator string // Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
CDRSStats string // Address where to reach the Mediator. <""|intenal>
CDRSStoreDisable bool // When true, CDRs will not longer be saved in stordb, useful for cdrstats only scenario
CDRStatsEnabled bool // Enable CDR Stats service
CDRStatConfig *CdrStatsConfig // Active cdr stats configuration instances, platform level
CdreProfiles map[string]*CdreConfig
CdrcProfiles map[string]map[string]*CdrcConfig // Number of CDRC instances running imports, format map[dirPath]map[instanceName]{Configs}
SmFsConfig *SmFsConfig // SM-FreeSWITCH configuration
SmKamConfig *SmKamConfig // SM-Kamailio Configuration
SmOsipsConfig *SmOsipsConfig // SM-OpenSIPS Configuration
SMEnabled bool
SMSwitchType string
SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer
SMCdrS string // Connection towards CDR server
SMReconnects int // Number of reconnect attempts to rater
SMDebitInterval int // the period to be debited in advanced during a call (in seconds)
SMMaxCallDuration time.Duration // The maximum duration of a call
SMMinCallDuration time.Duration // Only authorize calls with allowed duration bigger than this
MediatorEnabled bool // Starts Mediator service: <true|false>.
MediatorReconnects int // Number of reconnects to rater before giving up.
MediatorRater string
MediatorStats string // Address where to reach the Rater: <internal|x.y.z.y:1234>
MediatorStoreDisable bool // When true, CDRs will not longer be saved in stordb, useful for cdrstats only scenario
FreeswitchServer string // freeswitch address host:port
FreeswitchPass string // FS socket password
FreeswitchReconnects int // number of times to attempt reconnect after connect fails
FSMinDurLowBalance time.Duration // Threshold which will trigger low balance warnings
FSLowBalanceAnnFile string // File to be played when low balance is reached
FSEmptyBalanceContext string // If defined, call will be transfered to this context on empty balance
FSEmptyBalanceAnnFile string // File to be played before disconnecting prepaid calls (applies only if no context defined)
FSCdrExtraFields []*utils.RSRField // Extra fields to store in CDRs in case of processing them
OsipsListenUdp string // Address where to listen for event datagrams coming from OpenSIPS
OsipsMiAddr string // Adress where to reach OpenSIPS mi_datagram module
OsipsEvSubscInterval time.Duration // Refresh event subscription at this interval
OsipsReconnects int // Number of attempts on connect failure.
KamailioEvApiAddr string // Address of the kamailio evapi server
KamailioReconnects int // Number of reconnect attempts on connection lost
HistoryAgentEnabled bool // Starts History as an agent: <true|false>.
HistoryServer string // Address where to reach the master history server: <internal|x.y.z.y:1234>
HistoryServerEnabled bool // Starts History as server: <true|false>.
HistoryDir string // Location on disk where to store history files.
HistorySaveInterval time.Duration // The timout duration between history writes
MailerServer string // The server to use when sending emails out
MailerAuthUser string // Authenticate to email server using this user
MailerAuthPass string // Authenticate to email server with this password
MailerFromAddr string // From address used when sending emails out
DataFolderPath string // Path towards data folder, for tests internal usage, not loading out of .json options
ConfigReloads map[string]chan struct{} // Signals to specific entities that a config reload should occur
RatingDBType string
RatingDBHost string // The host to connect to. Values that start with / are for UNIX domain sockets.
RatingDBPort string // The port to bind to.
RatingDBName string // The name of the database to connect to.
RatingDBUser string // The user to sign in as.
RatingDBPass string // The user's password.
AccountDBType string
AccountDBHost string // The host to connect to. Values that start with / are for UNIX domain sockets.
AccountDBPort string // The port to bind to.
AccountDBName string // The name of the database to connect to.
AccountDBUser string // The user to sign in as.
AccountDBPass string // The user's password.
StorDBType string // Should reflect the database type used to store logs
StorDBHost string // The host to connect to. Values that start with / are for UNIX domain sockets.
StorDBPort string // Th e port to bind to.
StorDBName string // The name of the database to connect to.
StorDBUser string // The user to sign in as.
StorDBPass string // The user's password.
StorDBMaxOpenConns int // Maximum database connections opened
StorDBMaxIdleConns int // Maximum idle connections to keep opened
DBDataEncoding string // The encoding used to store object data in strings: <msgpack|json>
RPCJSONListen string // RPC JSON listening address
RPCGOBListen string // RPC GOB listening address
HTTPListen string // HTTP listening address
DefaultReqType string // Use this request type if not defined on top
DefaultCategory string // set default type of record
DefaultTenant string // set default tenant
DefaultSubject string // set default rating subject, useful in case of fallback
RoundingDecimals int // Number of decimals to round end prices at
HttpSkipTlsVerify bool // If enabled Http Client will accept any TLS certificate
TpExportPath string // Path towards export folder for offline Tariff Plans
MaxCallDuration time.Duration // The maximum call duration (used by responder when querying DerivedCharging) // ToDo: export it in configuration file
RaterEnabled bool // start standalone server (no balancer)
RaterBalancer string // balancer address host:port
BalancerEnabled bool
SchedulerEnabled bool
CDRSEnabled bool // Enable CDR Server service
CDRSExtraFields []*utils.RSRField // Extra fields to store in CDRs
CDRSMediator string // Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
CDRSStats string // Address where to reach the Mediator. <""|intenal>
CDRSStoreDisable bool // When true, CDRs will not longer be saved in stordb, useful for cdrstats only scenario
CDRStatsEnabled bool // Enable CDR Stats service
CDRStatConfig *CdrStatsConfig // Active cdr stats configuration instances, platform level
CdreProfiles map[string]*CdreConfig
CdrcProfiles map[string]map[string]*CdrcConfig // Number of CDRC instances running imports, format map[dirPath]map[instanceName]{Configs}
SmFsConfig *SmFsConfig // SM-FreeSWITCH configuration
SmKamConfig *SmKamConfig // SM-Kamailio Configuration
SmOsipsConfig *SmOsipsConfig // SM-OpenSIPS Configuration
MediatorEnabled bool // Starts Mediator service: <true|false>.
MediatorReconnects int // Number of reconnects to rater before giving up.
MediatorRater string
MediatorStats string // Address where to reach the Rater: <internal|x.y.z.y:1234>
MediatorStoreDisable bool // When true, CDRs will not longer be saved in stordb, useful for cdrstats only scenario
HistoryAgentEnabled bool // Starts History as an agent: <true|false>.
HistoryServer string // Address where to reach the master history server: <internal|x.y.z.y:1234>
HistoryServerEnabled bool // Starts History as server: <true|false>.
HistoryDir string // Location on disk where to store history files.
HistorySaveInterval time.Duration // The timout duration between history writes
MailerServer string // The server to use when sending emails out
MailerAuthUser string // Authenticate to email server using this user
MailerAuthPass string // Authenticate to email server with this password
MailerFromAddr string // From address used when sending emails out
DataFolderPath string // Path towards data folder, for tests internal usage, not loading out of .json options
ConfigReloads map[string]chan struct{} // Signals to specific entities that a config reload should occur
// Cache defaults loaded from json and needing clones
dfltCdreProfile *CdreConfig // Default cdreConfig profile
dfltCdrcProfile *CdrcConfig // Default cdrcConfig profile
@@ -262,9 +241,11 @@ func (self *CGRConfig) checkConfigSanity() error {
if self.MediatorStats == utils.INTERNAL && !self.CDRStatsEnabled {
return errors.New("CDRStats not enabled but requested by Mediator.")
}
if self.SMCdrS == utils.INTERNAL && !self.CDRSEnabled {
return errors.New("CDRS not enabled but requested by SessionManager")
}
/*
if self.SMCdrS == utils.INTERNAL && !self.CDRSEnabled {
return errors.New("CDRS not enabled but requested by SessionManager")
}
*/
return nil
}
@@ -323,22 +304,6 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
if err != nil {
return err
}
jsnSMCfg, err := jsnCfg.SessionManagerJsonCfg()
if err != nil {
return err
}
jsnFSCfg, err := jsnCfg.FSJsonCfg()
if err != nil {
return err
}
jsnKamCfg, err := jsnCfg.KamailioJsonCfg()
if err != nil {
return err
}
jsnOsipsCfg, err := jsnCfg.OsipsJsonCfg()
if err != nil {
return err
}
jsnSmFsCfg, err := jsnCfg.SmFsJsonCfg()
if err != nil {
return err
@@ -549,90 +514,6 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
}
}
}
if jsnSMCfg != nil {
if jsnSMCfg.Enabled != nil {
self.SMEnabled = *jsnSMCfg.Enabled
}
if jsnSMCfg.Switch_type != nil {
self.SMSwitchType = *jsnSMCfg.Switch_type
}
if jsnSMCfg.Rater != nil {
self.SMRater = *jsnSMCfg.Rater
}
if jsnSMCfg.Cdrs != nil {
self.SMCdrS = *jsnSMCfg.Cdrs
}
if jsnSMCfg.Reconnects != nil {
self.SMReconnects = *jsnSMCfg.Reconnects
}
if jsnSMCfg.Debit_interval != nil {
self.SMDebitInterval = *jsnSMCfg.Debit_interval
}
if jsnSMCfg.Max_call_duration != nil {
if self.SMMaxCallDuration, err = utils.ParseDurationWithSecs(*jsnSMCfg.Max_call_duration); err != nil {
return err
}
}
if jsnSMCfg.Min_call_duration != nil {
if self.SMMinCallDuration, err = utils.ParseDurationWithSecs(*jsnSMCfg.Min_call_duration); err != nil {
return err
}
}
}
if jsnFSCfg != nil {
if jsnFSCfg.Server != nil {
self.FreeswitchServer = *jsnFSCfg.Server
}
if jsnFSCfg.Password != nil {
self.FreeswitchPass = *jsnFSCfg.Password
}
if jsnFSCfg.Reconnects != nil {
self.FreeswitchReconnects = *jsnFSCfg.Reconnects
}
if jsnFSCfg.Min_dur_low_balance != nil {
if self.FSMinDurLowBalance, err = utils.ParseDurationWithSecs(*jsnFSCfg.Min_dur_low_balance); err != nil {
return err
}
}
if jsnFSCfg.Low_balance_ann_file != nil {
self.FSLowBalanceAnnFile = *jsnFSCfg.Low_balance_ann_file
}
if jsnFSCfg.Empty_balance_context != nil {
self.FSEmptyBalanceContext = *jsnFSCfg.Empty_balance_context
}
if jsnFSCfg.Empty_balance_ann_file != nil {
self.FSEmptyBalanceAnnFile = *jsnFSCfg.Empty_balance_ann_file
}
if jsnFSCfg.Cdr_extra_fields != nil {
if self.FSCdrExtraFields, err = utils.ParseRSRFieldsFromSlice(*jsnFSCfg.Cdr_extra_fields); err != nil {
return err
}
}
}
if jsnOsipsCfg != nil {
if jsnOsipsCfg.Listen_udp != nil {
self.OsipsListenUdp = *jsnOsipsCfg.Listen_udp
}
if jsnOsipsCfg.Mi_addr != nil {
self.OsipsMiAddr = *jsnOsipsCfg.Mi_addr
}
if jsnOsipsCfg.Events_subscribe_interval != nil {
if self.OsipsEvSubscInterval, err = utils.ParseDurationWithSecs(*jsnOsipsCfg.Events_subscribe_interval); err != nil {
return err
}
}
if jsnOsipsCfg.Reconnects != nil {
self.OsipsReconnects = *jsnOsipsCfg.Reconnects
}
}
if jsnKamCfg != nil {
if jsnKamCfg.Evapi_addr != nil {
self.KamailioEvApiAddr = *jsnKamCfg.Evapi_addr
}
if jsnKamCfg.Reconnects != nil {
self.KamailioReconnects = *jsnKamCfg.Reconnects
}
}
if jsnSmFsCfg != nil {
if err := self.SmFsConfig.loadFromJsonCfg(jsnSmFsCfg); err != nil {
return err

View File

@@ -200,42 +200,11 @@ const CGRATES_CFG_JSON = `
}
},
"session_manager": {
"enabled": false, // starts SessionManager service: <true|false>
"switch_type": "freeswitch", // defines the type of switch behind: <freeswitch|kamailio|opensips>
"rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013>
"cdrs": "", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234>
"reconnects": 3, // number of reconnects to rater/cdrs before giving up.
"debit_interval": 10, // interval to perform debits on.
"min_call_duration": "0s", // only authorize calls with allowed duration bigger than this
"max_call_duration": "3h", // maximum call duration a prepaid call can last
},
"freeswitch": {
"server": "127.0.0.1:8021", // adress where to connect to FreeSWITCH socket.
"password": "ClueCon", // freeSWITCH socket password.
"reconnects": 5, // number of attempts on connect failure.
"min_dur_low_balance": "5s", // threshold which will trigger low balance warnings for prepaid calls (needs to be lower than debit_interval)
"low_balance_ann_file": "", // file to be played when low balance is reached for prepaid calls
"empty_balance_context": "", // if defined, prepaid calls will be transfered to this context on empty balance
"empty_balance_ann_file": "", // file to be played before disconnecting prepaid calls on empty balance (applies only if no context defined)
"cdr_extra_fields": [], // extra fields to store in CDRs in case of processing them
},
"kamailio": {
"evapi_addr": "127.0.0.1:8448", // address of the kamailio evapi server
"reconnects": 3, // number of attempts on connect failure
},
"opensips": {
"listen_udp": "127.0.0.1:2020", // address where to listen for datagram events coming from OpenSIPS
"mi_addr": "127.0.0.1:8020", // adress where to reach OpenSIPS mi_datagram module
"events_subscribe_interval": "60s", // automatic events subscription to OpenSIPS, 0 to disable it
"reconnects": 3, // number of attempts on connect failure
},
"sm_freeswitch": {
"enabled": false, // starts SessionManager service: <true|false>
"rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013>
"cdrs": "", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234>
"reconnects": 5, // number of reconnect attempts to rater or cdrs
"cdr_extra_fields": [], // extra fields to store in CDRs in case of processing them
"debit_interval": "10s", // interval to perform debits on.
"min_call_duration": "0s", // only authorize calls with allowed duration higher than this
@@ -254,10 +223,11 @@ const CGRATES_CFG_JSON = `
"enabled": false, // starts SessionManager service: <true|false>
"rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013>
"cdrs": "", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234>
"reconnects": 5, // number of reconnect attempts to rater or cdrs
"debit_interval": "10s", // interval to perform debits on.
"min_call_duration": "0s", // only authorize calls with allowed duration higher than this
"max_call_duration": "3h", // maximum call duration a prepaid call can last
"connections":[ // Instantiate connections to multiple Kamailio servers
"connections":[ // instantiate connections to multiple Kamailio servers
{"evapi_addr": "127.0.0.1:8448", "reconnects": -1} // reconnects -1 to indefinitely connect
],
},
@@ -272,9 +242,8 @@ const CGRATES_CFG_JSON = `
"min_call_duration": "0s", // only authorize calls with allowed duration higher than this
"max_call_duration": "3h", // maximum call duration a prepaid call can last
"events_subscribe_interval": "60s", // automatic events subscription to OpenSIPS, 0 to disable it
"mi_addr": "127.0.0.1:8020", // Address where to reach OpenSIPS MI to send session disconnects
"mi_addr": "127.0.0.1:8020", // address where to reach OpenSIPS MI to send session disconnects
"reconnects": -1, // reconnects -1 to indefinitely connect
},
@@ -298,4 +267,5 @@ const CGRATES_CFG_JSON = `
"from_address": "cgr-mailer@localhost.localdomain" // from address used when sending emails out
},
}`

View File

@@ -206,54 +206,6 @@ func (self CgrJsonCfg) CdrcJsonCfg() (map[string]*CdrcJsonCfg, error) {
return cfg, nil
}
func (self CgrJsonCfg) SessionManagerJsonCfg() (*SessionManagerJsonCfg, error) {
rawCfg, hasKey := self[SM_JSN]
if !hasKey {
return nil, nil
}
cfg := new(SessionManagerJsonCfg)
if err := json.Unmarshal(*rawCfg, cfg); err != nil {
return nil, err
}
return cfg, nil
}
func (self CgrJsonCfg) FSJsonCfg() (*FSJsonCfg, error) {
rawCfg, hasKey := self[FS_JSN]
if !hasKey {
return nil, nil
}
cfg := new(FSJsonCfg)
if err := json.Unmarshal(*rawCfg, cfg); err != nil {
return nil, err
}
return cfg, nil
}
func (self CgrJsonCfg) KamailioJsonCfg() (*KamailioJsonCfg, error) {
rawCfg, hasKey := self[KAMAILIO_JSN]
if !hasKey {
return nil, nil
}
cfg := new(KamailioJsonCfg)
if err := json.Unmarshal(*rawCfg, cfg); err != nil {
return nil, err
}
return cfg, nil
}
func (self CgrJsonCfg) OsipsJsonCfg() (*OsipsJsonCfg, error) {
rawCfg, hasKey := self[OSIPS_JSN]
if !hasKey {
return nil, nil
}
cfg := new(OsipsJsonCfg)
if err := json.Unmarshal(*rawCfg, cfg); err != nil {
return nil, err
}
return cfg, nil
}
func (self CgrJsonCfg) SmFsJsonCfg() (*SmFsJsonCfg, error) {
rawCfg, hasKey := self[SMFS_JSN]
if !hasKey {

View File

@@ -334,73 +334,12 @@ func TestDfCdrcJsonCfg(t *testing.T) {
}
}
func TestDfSessionManagerJsonCfg(t *testing.T) {
eCfg := &SessionManagerJsonCfg{
Enabled: utils.BoolPointer(false),
Switch_type: utils.StringPointer("freeswitch"),
Rater: utils.StringPointer("internal"),
Cdrs: utils.StringPointer(""),
Reconnects: utils.IntPointer(3),
Debit_interval: utils.IntPointer(10),
Min_call_duration: utils.StringPointer("0s"),
Max_call_duration: utils.StringPointer("3h"),
}
if cfg, err := dfCgrJsonCfg.SessionManagerJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, cfg) {
t.Error("Received: ", cfg)
}
}
func TestDfFSJsonCfg(t *testing.T) {
eCfg := &FSJsonCfg{
Server: utils.StringPointer("127.0.0.1:8021"),
Password: utils.StringPointer("ClueCon"),
Reconnects: utils.IntPointer(5),
Min_dur_low_balance: utils.StringPointer("5s"),
Low_balance_ann_file: utils.StringPointer(""),
Empty_balance_context: utils.StringPointer(""),
Empty_balance_ann_file: utils.StringPointer(""),
Cdr_extra_fields: utils.StringSlicePointer([]string{}),
}
if cfg, err := dfCgrJsonCfg.FSJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, cfg) {
t.Error("Received: ", cfg)
}
}
func TestDfKamailioJsonCfg(t *testing.T) {
eCfg := &KamailioJsonCfg{
Evapi_addr: utils.StringPointer("127.0.0.1:8448"),
Reconnects: utils.IntPointer(3),
}
if cfg, err := dfCgrJsonCfg.KamailioJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, cfg) {
t.Error("Received: ", cfg)
}
}
func TestDfOsipsJsonCfg(t *testing.T) {
eCfg := &OsipsJsonCfg{
Listen_udp: utils.StringPointer("127.0.0.1:2020"),
Mi_addr: utils.StringPointer("127.0.0.1:8020"),
Events_subscribe_interval: utils.StringPointer("60s"),
Reconnects: utils.IntPointer(3),
}
if cfg, err := dfCgrJsonCfg.OsipsJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, cfg) {
t.Error("Received: ", cfg)
}
}
func TestSmFsJsonCfg(t *testing.T) {
eCfg := &SmFsJsonCfg{
Enabled: utils.BoolPointer(false),
Rater: utils.StringPointer("internal"),
Cdrs: utils.StringPointer(""),
Reconnects: utils.IntPointer(5),
Cdr_extra_fields: utils.StringSlicePointer([]string{}),
Debit_interval: utils.StringPointer("10s"),
Min_call_duration: utils.StringPointer("0s"),
@@ -428,6 +367,7 @@ func TestSmKamJsonCfg(t *testing.T) {
Enabled: utils.BoolPointer(false),
Rater: utils.StringPointer("internal"),
Cdrs: utils.StringPointer(""),
Reconnects: utils.IntPointer(5),
Debit_interval: utils.StringPointer("10s"),
Min_call_duration: utils.StringPointer("0s"),
Max_call_duration: utils.StringPointer("3h"),

View File

@@ -153,49 +153,12 @@ type CdrcJsonCfg struct {
Cdr_fields *[]*CdrFieldJsonCfg
}
// Session manager config section
type SessionManagerJsonCfg struct {
Enabled *bool
Switch_type *string
Rater *string
Cdrs *string
Reconnects *int
Debit_interval *int
Min_call_duration *string
Max_call_duration *string
}
// FreeSWITCH config section
type FSJsonCfg struct {
Server *string
Password *string
Reconnects *int
Min_dur_low_balance *string
Low_balance_ann_file *string
Empty_balance_context *string
Empty_balance_ann_file *string
Cdr_extra_fields *[]string
}
// Kamailio config section
type KamailioJsonCfg struct {
Evapi_addr *string
Reconnects *int
}
// Opensips config section
type OsipsJsonCfg struct {
Listen_udp *string
Mi_addr *string
Events_subscribe_interval *string
Reconnects *int
}
// SM-FreeSWITCH config section
type SmFsJsonCfg struct {
Enabled *bool
Rater *string
Cdrs *string
Reconnects *int
Cdr_extra_fields *[]string
Debit_interval *string
Min_call_duration *string
@@ -219,6 +182,7 @@ type SmKamJsonCfg struct {
Enabled *bool
Rater *string
Cdrs *string
Reconnects *int
Debit_interval *string
Min_call_duration *string
Max_call_duration *string

View File

@@ -58,7 +58,8 @@ type SmFsConfig struct {
Enabled bool
Rater string
Cdrs string
CdrExtraFields []string
Reconnects int
CdrExtraFields []*utils.RSRField
DebitInterval time.Duration
MinCallDuration time.Duration
MaxCallDuration time.Duration
@@ -83,8 +84,13 @@ func (self *SmFsConfig) loadFromJsonCfg(jsnCfg *SmFsJsonCfg) error {
if jsnCfg.Cdrs != nil {
self.Cdrs = *jsnCfg.Cdrs
}
if jsnCfg.Reconnects != nil {
self.Reconnects = *jsnCfg.Reconnects
}
if jsnCfg.Cdr_extra_fields != nil {
self.CdrExtraFields = *jsnCfg.Cdr_extra_fields
if self.CdrExtraFields, err = utils.ParseRSRFieldsFromSlice(*jsnCfg.Cdr_extra_fields); err != nil {
return err
}
}
if jsnCfg.Debit_interval != nil {
if self.DebitInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Debit_interval); err != nil {
@@ -157,6 +163,7 @@ type SmKamConfig struct {
Enabled bool
Rater string
Cdrs string
Reconnects int
DebitInterval time.Duration
MinCallDuration time.Duration
MaxCallDuration time.Duration
@@ -177,6 +184,9 @@ func (self *SmKamConfig) loadFromJsonCfg(jsnCfg *SmKamJsonCfg) error {
if jsnCfg.Cdrs != nil {
self.Cdrs = *jsnCfg.Cdrs
}
if jsnCfg.Reconnects != nil {
self.Reconnects = *jsnCfg.Reconnects
}
if jsnCfg.Debit_interval != nil {
if self.DebitInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Debit_interval); err != nil {
return err
@@ -202,14 +212,6 @@ func (self *SmKamConfig) loadFromJsonCfg(jsnCfg *SmKamJsonCfg) error {
return nil
}
// Returns the first cached default value for a SM-FreeSWITCH connection
func NewDfltOsipsConnConfig() *OsipsConnConfig {
if dfltOsipsConnConfig == nil {
return new(OsipsConnConfig) // No defaults, most probably we are building the defaults now
}
return dfltOsipsConnConfig
}
// Represents one connection instance towards OpenSIPS, not in use for now but planned for future
type OsipsConnConfig struct {
MiAddr string
@@ -232,12 +234,12 @@ type SmOsipsConfig struct {
ListenUdp string
Rater string
Cdrs string
Reconnects int
DebitInterval time.Duration
MinCallDuration time.Duration
MaxCallDuration time.Duration
EventsSubscribeInterval time.Duration
MiAddr string
Reconnects int
}
func (self *SmOsipsConfig) loadFromJsonCfg(jsnCfg *SmOsipsJsonCfg) error {

View File

@@ -6,7 +6,6 @@
// This file contains the default configuration hardcoded into CGRateS.
// This is what you get when you load CGRateS with an empty configuration file.
//"general": {
// "http_skip_tls_veify": false, // if enabled Http Client will accept any TLS certificate
// "rounding_decimals": 10, // system level precision for floats
@@ -180,42 +179,50 @@
// }
//},
//"session_manager": {
//"sm_freeswitch": {
// "enabled": false, // starts SessionManager service: <true|false>
// "switch_type": "freeswitch", // defines the type of switch behind: <freeswitch|kamailio|opensips>
// "rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013>
// "cdrs": "", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234>
// "reconnects": 3, // number of reconnects to rater/cdrs before giving up.
// "debit_interval": 10, // interval to perform debits on.
// "min_call_duration": "0s", // only authorize calls with allowed duration bigger than this
// "reconnects": 5, // number of reconnect attempts to rater or cdrs
// "cdr_extra_fields": [], // extra fields to store in CDRs in case of processing them
// "debit_interval": "10s", // interval to perform debits on.
// "min_call_duration": "0s", // only authorize calls with allowed duration higher than this
// "max_call_duration": "3h", // maximum call duration a prepaid call can last
//},
//"freeswitch": {
// "server": "127.0.0.1:8021", // adress where to connect to FreeSWITCH socket.
// "password": "ClueCon", // freeSWITCH socket password.
// "reconnects": 5, // number of attempts on connect failure.
// "min_dur_low_balance": "5s", // threshold which will trigger low balance warnings for prepaid calls (needs to be lower than debit_interval)
// "low_balance_ann_file": "", // file to be played when low balance is reached for prepaid calls
// "empty_balance_context": "", // if defined, prepaid calls will be transfered to this context on empty balance
// "empty_balance_context": "", // if defined, prepaid calls will be transfered to this context on empty balance
// "empty_balance_ann_file": "", // file to be played before disconnecting prepaid calls on empty balance (applies only if no context defined)
// "cdr_extra_fields": [], // extra fields to store in CDRs in case of processing them
// "connections":[ // instantiate connections to multiple FreeSWITCH servers
// {"server": "127.0.0.1:8021", "password": "ClueCon", "reconnects": -1} // reconnects -1 to indefinitely connect
// ],
//},
//"kamailio": {
// "evapi_addr": "127.0.0.1:8448", // address of the kamailio evapi server
// "reconnects": 3, // number of attempts on connect failure
//"sm_kamailio": {
// "enabled": false, // starts SessionManager service: <true|false>
// "rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013>
// "cdrs": "", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234>
// "reconnects": 5, // number of reconnect attempts to rater or cdrs
// "debit_interval": "10s", // interval to perform debits on.
// "min_call_duration": "0s", // only authorize calls with allowed duration higher than this
// "max_call_duration": "3h", // maximum call duration a prepaid call can last
// "connections":[ // instantiate connections to multiple Kamailio servers
// {"evapi_addr": "127.0.0.1:8448", "reconnects": -1} // reconnects -1 to indefinitely connect
// ],
//},
//"opensips": {
//"sm_opensips": {
// "enabled": false, // starts SessionManager service: <true|false>
// "listen_udp": "127.0.0.1:2020", // address where to listen for datagram events coming from OpenSIPS
// "mi_addr": "127.0.0.1:8020", // adress where to reach OpenSIPS mi_datagram module
// "rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013>
// "cdrs": "", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234>
// "debit_interval": "10s", // interval to perform debits on.
// "min_call_duration": "0s", // only authorize calls with allowed duration higher than this
// "max_call_duration": "3h", // maximum call duration a prepaid call can last
// "events_subscribe_interval": "60s", // automatic events subscription to OpenSIPS, 0 to disable it
// "reconnects": 3, // number of attempts on connect failure
// "mi_addr": "127.0.0.1:8020", // address where to reach OpenSIPS MI to send session disconnects
// "reconnects": -1, // reconnects -1 to indefinitely connect
//},
@@ -239,4 +246,5 @@
// "from_address": "cgr-mailer@localhost.localdomain" // from address used when sending emails out
//},
}
//}

View File

@@ -88,13 +88,6 @@ func SetStorageLogger(sg LogStorage) {
storageLogger = sg
}
/*
Exported method to set the debit period for caching purposes.
*/
func SetDebitPeriod(d time.Duration) {
debitPeriod = d
}
// Exported method to set the history scribe.
func SetHistoryScribe(scribe history.Scribe) {
historyScribe = scribe

View File

@@ -165,7 +165,7 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev utils.Event, reply *float64) er
Account: ev.GetAccount(dc.AccountField),
Destination: ev.GetDestination(dc.DestinationField),
TimeStart: startTime,
TimeEnd: startTime.Add(config.CgrConfig().SMMaxCallDuration),
TimeEnd: startTime.Add(config.CgrConfig().MaxCallDuration),
}
var remainingDuration float64
err = rs.GetMaxSessionTime(cd, &remainingDuration)

View File

@@ -220,7 +220,7 @@ func (fsev FSEvent) GetOriginatorIP(fieldName string) string {
func (fsev FSEvent) GetExtraFields() map[string]string {
extraFields := make(map[string]string)
for _, fldRule := range config.CgrConfig().FSCdrExtraFields {
for _, fldRule := range config.CgrConfig().SmFsConfig.CdrExtraFields {
extraFields[fldRule.Id] = fsev.ParseEventValue(fldRule)
}
return extraFields

View File

@@ -630,7 +630,7 @@ func TestFsEvAsStoredCdr(t *testing.T) {
func TestFsEvGetExtraFields(t *testing.T) {
cfg, _ := config.NewDefaultCGRConfig()
cfg.FSCdrExtraFields = []*utils.RSRField{&utils.RSRField{Id: "Channel-Read-Codec-Name"}, &utils.RSRField{Id: "Channel-Write-Codec-Name"}, &utils.RSRField{Id: "NonExistingHeader"}}
cfg.SmFsConfig.CdrExtraFields = []*utils.RSRField{&utils.RSRField{Id: "Channel-Read-Codec-Name"}, &utils.RSRField{Id: "Channel-Write-Codec-Name"}, &utils.RSRField{Id: "NonExistingHeader"}}
config.SetCgrConfig(cfg)
ev := new(FSEvent).AsEvent(hangupEv)
expectedExtraFields := map[string]string{"Channel-Read-Codec-Name": "G722", "Channel-Write-Codec-Name": "G722", "NonExistingHeader": ""}