added new connections to cdrs service

This commit is contained in:
Radu Ioan Fericean
2015-09-04 16:59:36 +03:00
parent 964ced8432
commit 0a1fc7d96d
7 changed files with 100 additions and 11 deletions

View File

@@ -229,7 +229,9 @@ func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS
}
func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, cdrDb engine.CdrStorage,
internalRaterChan chan *engine.Responder, internalCdrStatSChan chan engine.StatsInterface, server *engine.Server, exitChan chan bool) {
internalRaterChan chan *engine.Responder, internalPubSubSChan chan engine.PublisherSubscriber,
internalUserSChan chan engine.UserService, internalAliaseSChan chan engine.AliasService,
internalCdrStatSChan chan engine.StatsInterface, server *engine.Server, exitChan chan bool) {
engine.Logger.Info("Starting CGRateS CDRS service.")
var err error
var client *rpcclient.RpcClient
@@ -248,6 +250,57 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage,
}
raterConn = &engine.RPCClientConnector{Client: client}
}
// Pubsub connection init
var pubSubConn engine.PublisherSubscriber
if cfg.CDRSPubSub == utils.INTERNAL {
pubSubConn = <-internalPubSubSChan
} else if len(cfg.CDRSPubSub) != 0 {
if cfg.CDRSRater == cfg.CDRSPubSub {
pubSubConn = &engine.ProxyPubSub{Client: client}
} else {
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSPubSub, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to pubsub server: %s", err.Error()))
exitChan <- true
return
}
pubSubConn = &engine.ProxyPubSub{Client: client}
}
}
// Users connection init
var usersConn engine.UserService
if cfg.CDRSUsers == utils.INTERNAL {
usersConn = <-internalUserSChan
} else if len(cfg.CDRSUsers) != 0 {
if cfg.CDRSRater == cfg.CDRSUsers {
usersConn = &engine.ProxyUserService{Client: client}
} else {
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSUsers, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to users server: %s", err.Error()))
exitChan <- true
return
}
usersConn = &engine.ProxyUserService{Client: client}
}
}
// Aliases connection init
var aliasesConn engine.AliasService
if cfg.CDRSAliases == utils.INTERNAL {
aliasesConn = <-internalAliaseSChan
} else if len(cfg.CDRSAliases) != 0 {
if cfg.CDRSRater == cfg.CDRSAliases {
aliasesConn = &engine.ProxyAliasService{Client: client}
} else {
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSAliases, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to aliases server: %s", err.Error()))
exitChan <- true
return
}
aliasesConn = &engine.ProxyAliasService{Client: client}
}
}
// Stats connection init
var statsConn engine.StatsInterface
if cfg.CDRSStats == utils.INTERNAL {
@@ -266,7 +319,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage,
}
}
cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, raterConn, statsConn)
cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, raterConn, pubSubConn, usersConn, aliasesConn, statsConn)
engine.Logger.Info("Registering CDRS HTTP Handlers.")
cdrServer.RegisterHanlersToServer(server)
engine.Logger.Info("Registering CDRS RPC service.")
@@ -488,7 +541,7 @@ func main() {
// Start CDR Server
if cfg.CDRSEnabled {
go startCDRS(internalCdrSChan, logDb, cdrDb, internalRaterChan, internalCdrStatSChan, server, exitChan)
go startCDRS(internalCdrSChan, logDb, cdrDb, internalRaterChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalCdrStatSChan, server, exitChan)
}
// Start CDR Stats server

View File

@@ -206,6 +206,9 @@ type CGRConfig struct {
CDRSExtraFields []*utils.RSRField // Extra fields to store in CDRs
CDRSStoreCdrs bool // store cdrs in storDb
CDRSRater string // address where to reach the Rater for cost calculation: <""|internal|x.y.z.y:1234>
CDRSPubSub string // address where to reach the pubsub service: <""|internal|x.y.z.y:1234>
CDRSUsers string // address where to reach the users service: <""|internal|x.y.z.y:1234>
CDRSAliases string // address where to reach the aliases service: <""|internal|x.y.z.y:1234>
CDRSStats string // address where to reach the cdrstats service. Empty to disable stats gathering <""|internal|x.y.z.y:1234>
CDRSCdrReplication []*CdrReplicationCfg // Replicate raw CDRs to a number of servers
CDRStatsEnabled bool // Enable CDR Stats service
@@ -263,6 +266,15 @@ func (self *CGRConfig) checkConfigSanity() error {
if self.CDRSRater == utils.INTERNAL && !self.RaterEnabled {
return errors.New("Rater not enabled but requested by CDRS component.")
}
if self.CDRSPubSub == utils.INTERNAL && !self.PubSubServerEnabled {
return errors.New("PubSub service not enabled but requested by CDRS component.")
}
if self.CDRSUsers == utils.INTERNAL && !self.UserServerEnabled {
return errors.New("Users service not enabled but requested by CDRS component.")
}
if self.CDRSAliases == utils.INTERNAL && !self.AliasesServerEnabled {
return errors.New("Aliases service not enabled but requested by CDRS component.")
}
if self.CDRSStats == utils.INTERNAL && !self.CDRStatsEnabled {
return errors.New("CDRStats not enabled but requested by CDRS component.")
}
@@ -612,6 +624,15 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
if jsnCdrsCfg.Rater != nil {
self.CDRSRater = *jsnCdrsCfg.Rater
}
if jsnCdrsCfg.Pubsubs != nil {
self.CDRSPubSub = *jsnCdrsCfg.Pubsubs
}
if jsnCdrsCfg.Users != nil {
self.CDRSUsers = *jsnCdrsCfg.Users
}
if jsnCdrsCfg.Aliases != nil {
self.CDRSAliases = *jsnCdrsCfg.Aliases
}
if jsnCdrsCfg.Cdrstats != nil {
self.CDRSStats = *jsnCdrsCfg.Cdrstats
}

View File

@@ -108,6 +108,9 @@ const CGRATES_CFG_JSON = `
"extra_fields": [], // extra fields to store in CDRs for non-generic CDRs
"store_cdrs": true, // store cdrs in storDb
"rater": "internal", // address where to reach the Rater for cost calculation, empty to disable functionality: <""|internal|x.y.z.y:1234>
"pubsubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234>
"users": "", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234>
"aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234>
"cdrstats": "", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234>
"reconnects": 5, // number of reconnect attempts to rater or cdrs
"cdr_replication":[], // replicate the raw CDR to a number of servers

View File

@@ -147,13 +147,16 @@ func TestDfCdrsJsonCfg(t *testing.T) {
Extra_fields: utils.StringSlicePointer([]string{}),
Store_cdrs: utils.BoolPointer(true),
Rater: utils.StringPointer("internal"),
Pubsubs: utils.StringPointer(""),
Users: utils.StringPointer(""),
Aliases: utils.StringPointer(""),
Cdrstats: utils.StringPointer(""),
Cdr_replication: &[]*CdrReplicationJsonCfg{},
}
if cfg, err := dfCgrJsonCfg.CdrsJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, cfg) {
t.Error("Received: ", *cfg)
t.Errorf("Received: %+v", *cfg)
}
}

View File

@@ -80,6 +80,9 @@ type CdrsJsonCfg struct {
Extra_fields *[]string
Store_cdrs *bool
Rater *string
Pubsubs *string
Users *string
Aliases *string
Cdrstats *string
Cdr_replication *[]*CdrReplicationJsonCfg
}

View File

@@ -86,6 +86,9 @@
// "extra_fields": [], // extra fields to store in CDRs for non-generic CDRs
// "store_cdrs": true, // store cdrs in storDb
// "rater": "internal", // address where to reach the Rater for cost calculation, empty to disable functionality: <""|internal|x.y.z.y:1234>
// "pubsubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234>
// "users": "", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234>
// "aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234>
// "cdrstats": "", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234>
// "reconnects": 5, // number of reconnect attempts to rater or cdrs
// "cdr_replication":[], // replicate the raw CDR to a number of servers

View File

@@ -64,16 +64,19 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
}
}
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, stats StatsInterface) (*CdrServer, error) {
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, stats: stats, guard: &GuardianLock{queue: make(map[string]chan bool)}}, nil
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, pubsub PublisherSubscriber, users UserService, aliases AliasService, stats StatsInterface) (*CdrServer, error) {
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: &GuardianLock{queue: make(map[string]chan bool)}}, nil
}
type CdrServer struct {
cgrCfg *config.CGRConfig
cdrDb CdrStorage
rater Connector
stats StatsInterface
guard *GuardianLock
cgrCfg *config.CGRConfig
cdrDb CdrStorage
rater Connector
pubsub PublisherSubscriber
users UserService
aliases AliasService
stats StatsInterface
guard *GuardianLock
}
func (self *CdrServer) Timezone() string {