From 0a1fc7d96d35b65143ff97bfc4faf210febc0b75 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 4 Sep 2015 16:59:36 +0300 Subject: [PATCH] added new connections to cdrs service --- cmd/cgr-engine/cgr-engine.go | 59 ++++++++++++++++++++++++++++++++-- config/config.go | 21 ++++++++++++ config/config_defaults.go | 3 ++ config/config_json_test.go | 5 ++- config/libconfig_json.go | 3 ++ data/conf/cgrates/cgrates.json | 3 ++ engine/cdrs.go | 17 ++++++---- 7 files changed, 100 insertions(+), 11 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 193f3c4d4..16d505af1 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -229,7 +229,9 @@ func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS } func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, cdrDb engine.CdrStorage, - internalRaterChan chan *engine.Responder, internalCdrStatSChan chan engine.StatsInterface, server *engine.Server, exitChan chan bool) { + internalRaterChan chan *engine.Responder, internalPubSubSChan chan engine.PublisherSubscriber, + internalUserSChan chan engine.UserService, internalAliaseSChan chan engine.AliasService, + internalCdrStatSChan chan engine.StatsInterface, server *engine.Server, exitChan chan bool) { engine.Logger.Info("Starting CGRateS CDRS service.") var err error var client *rpcclient.RpcClient @@ -248,6 +250,57 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, } raterConn = &engine.RPCClientConnector{Client: client} } + // Pubsub connection init + var pubSubConn engine.PublisherSubscriber + if cfg.CDRSPubSub == utils.INTERNAL { + pubSubConn = <-internalPubSubSChan + } else if len(cfg.CDRSPubSub) != 0 { + if cfg.CDRSRater == cfg.CDRSPubSub { + pubSubConn = &engine.ProxyPubSub{Client: client} + } else { + client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSPubSub, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + if err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not connect to pubsub server: %s", err.Error())) + exitChan <- true + return + } + pubSubConn = &engine.ProxyPubSub{Client: client} + } + } + // Users connection init + var usersConn engine.UserService + if cfg.CDRSUsers == utils.INTERNAL { + usersConn = <-internalUserSChan + } else if len(cfg.CDRSUsers) != 0 { + if cfg.CDRSRater == cfg.CDRSUsers { + usersConn = &engine.ProxyUserService{Client: client} + } else { + client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSUsers, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + if err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not connect to users server: %s", err.Error())) + exitChan <- true + return + } + usersConn = &engine.ProxyUserService{Client: client} + } + } + // Aliases connection init + var aliasesConn engine.AliasService + if cfg.CDRSAliases == utils.INTERNAL { + aliasesConn = <-internalAliaseSChan + } else if len(cfg.CDRSAliases) != 0 { + if cfg.CDRSRater == cfg.CDRSAliases { + aliasesConn = &engine.ProxyAliasService{Client: client} + } else { + client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSAliases, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + if err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not connect to aliases server: %s", err.Error())) + exitChan <- true + return + } + aliasesConn = &engine.ProxyAliasService{Client: client} + } + } // Stats connection init var statsConn engine.StatsInterface if cfg.CDRSStats == utils.INTERNAL { @@ -266,7 +319,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, } } - cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, raterConn, statsConn) + cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, raterConn, pubSubConn, usersConn, aliasesConn, statsConn) engine.Logger.Info("Registering CDRS HTTP Handlers.") cdrServer.RegisterHanlersToServer(server) engine.Logger.Info("Registering CDRS RPC service.") @@ -488,7 +541,7 @@ func main() { // Start CDR Server if cfg.CDRSEnabled { - go startCDRS(internalCdrSChan, logDb, cdrDb, internalRaterChan, internalCdrStatSChan, server, exitChan) + go startCDRS(internalCdrSChan, logDb, cdrDb, internalRaterChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalCdrStatSChan, server, exitChan) } // Start CDR Stats server diff --git a/config/config.go b/config/config.go index 6d46974d6..41f64d958 100644 --- a/config/config.go +++ b/config/config.go @@ -206,6 +206,9 @@ type CGRConfig struct { CDRSExtraFields []*utils.RSRField // Extra fields to store in CDRs CDRSStoreCdrs bool // store cdrs in storDb CDRSRater string // address where to reach the Rater for cost calculation: <""|internal|x.y.z.y:1234> + CDRSPubSub string // address where to reach the pubsub service: <""|internal|x.y.z.y:1234> + CDRSUsers string // address where to reach the users service: <""|internal|x.y.z.y:1234> + CDRSAliases string // address where to reach the aliases service: <""|internal|x.y.z.y:1234> CDRSStats string // address where to reach the cdrstats service. Empty to disable stats gathering <""|internal|x.y.z.y:1234> CDRSCdrReplication []*CdrReplicationCfg // Replicate raw CDRs to a number of servers CDRStatsEnabled bool // Enable CDR Stats service @@ -263,6 +266,15 @@ func (self *CGRConfig) checkConfigSanity() error { if self.CDRSRater == utils.INTERNAL && !self.RaterEnabled { return errors.New("Rater not enabled but requested by CDRS component.") } + if self.CDRSPubSub == utils.INTERNAL && !self.PubSubServerEnabled { + return errors.New("PubSub service not enabled but requested by CDRS component.") + } + if self.CDRSUsers == utils.INTERNAL && !self.UserServerEnabled { + return errors.New("Users service not enabled but requested by CDRS component.") + } + if self.CDRSAliases == utils.INTERNAL && !self.AliasesServerEnabled { + return errors.New("Aliases service not enabled but requested by CDRS component.") + } if self.CDRSStats == utils.INTERNAL && !self.CDRStatsEnabled { return errors.New("CDRStats not enabled but requested by CDRS component.") } @@ -612,6 +624,15 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { if jsnCdrsCfg.Rater != nil { self.CDRSRater = *jsnCdrsCfg.Rater } + if jsnCdrsCfg.Pubsubs != nil { + self.CDRSPubSub = *jsnCdrsCfg.Pubsubs + } + if jsnCdrsCfg.Users != nil { + self.CDRSUsers = *jsnCdrsCfg.Users + } + if jsnCdrsCfg.Aliases != nil { + self.CDRSAliases = *jsnCdrsCfg.Aliases + } if jsnCdrsCfg.Cdrstats != nil { self.CDRSStats = *jsnCdrsCfg.Cdrstats } diff --git a/config/config_defaults.go b/config/config_defaults.go index 785774405..39382f811 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -108,6 +108,9 @@ const CGRATES_CFG_JSON = ` "extra_fields": [], // extra fields to store in CDRs for non-generic CDRs "store_cdrs": true, // store cdrs in storDb "rater": "internal", // address where to reach the Rater for cost calculation, empty to disable functionality: <""|internal|x.y.z.y:1234> + "pubsubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234> + "users": "", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234> + "aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234> "cdrstats": "", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234> "reconnects": 5, // number of reconnect attempts to rater or cdrs "cdr_replication":[], // replicate the raw CDR to a number of servers diff --git a/config/config_json_test.go b/config/config_json_test.go index b5423276a..864663323 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -147,13 +147,16 @@ func TestDfCdrsJsonCfg(t *testing.T) { Extra_fields: utils.StringSlicePointer([]string{}), Store_cdrs: utils.BoolPointer(true), Rater: utils.StringPointer("internal"), + Pubsubs: utils.StringPointer(""), + Users: utils.StringPointer(""), + Aliases: utils.StringPointer(""), Cdrstats: utils.StringPointer(""), Cdr_replication: &[]*CdrReplicationJsonCfg{}, } if cfg, err := dfCgrJsonCfg.CdrsJsonCfg(); err != nil { t.Error(err) } else if !reflect.DeepEqual(eCfg, cfg) { - t.Error("Received: ", *cfg) + t.Errorf("Received: %+v", *cfg) } } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index d20525720..8e3fe2d6a 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -80,6 +80,9 @@ type CdrsJsonCfg struct { Extra_fields *[]string Store_cdrs *bool Rater *string + Pubsubs *string + Users *string + Aliases *string Cdrstats *string Cdr_replication *[]*CdrReplicationJsonCfg } diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index f00547b68..afe5206e1 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -86,6 +86,9 @@ // "extra_fields": [], // extra fields to store in CDRs for non-generic CDRs // "store_cdrs": true, // store cdrs in storDb // "rater": "internal", // address where to reach the Rater for cost calculation, empty to disable functionality: <""|internal|x.y.z.y:1234> +// "pubsubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234> +// "users": "", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234> +// "aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234> // "cdrstats": "", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234> // "reconnects": 5, // number of reconnect attempts to rater or cdrs // "cdr_replication":[], // replicate the raw CDR to a number of servers diff --git a/engine/cdrs.go b/engine/cdrs.go index e76f8e03f..7f72fd274 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -64,16 +64,19 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) { } } -func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, stats StatsInterface) (*CdrServer, error) { - return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, stats: stats, guard: &GuardianLock{queue: make(map[string]chan bool)}}, nil +func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, pubsub PublisherSubscriber, users UserService, aliases AliasService, stats StatsInterface) (*CdrServer, error) { + return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: &GuardianLock{queue: make(map[string]chan bool)}}, nil } type CdrServer struct { - cgrCfg *config.CGRConfig - cdrDb CdrStorage - rater Connector - stats StatsInterface - guard *GuardianLock + cgrCfg *config.CGRConfig + cdrDb CdrStorage + rater Connector + pubsub PublisherSubscriber + users UserService + aliases AliasService + stats StatsInterface + guard *GuardianLock } func (self *CdrServer) Timezone() string {