diff --git a/cdrs/cdrs.go b/cdrs/cdrs.go index 7122b20cc..3f801fbd3 100644 --- a/cdrs/cdrs.go +++ b/cdrs/cdrs.go @@ -33,10 +33,19 @@ var ( cfg *config.CGRConfig // Share the configuration with the rest of the package storage engine.CdrStorage medi *mediator.Mediator + stats engine.StatsInterface ) // Returns error if not able to properly store the CDR, mediation is async since we can always recover offline func storeAndMediate(storedCdr *utils.StoredCdr) error { + if stats != nil { + go func() { + var x int = 0 // not used + if err := stats.AppendCDR(storedCdr, &x); err != nil { + engine.Logger.Err(fmt.Sprintf("Could not append cdr to stats: %s", err.Error())) + } + }() + } if err := storage.SetCdr(storedCdr); err != nil { return err } @@ -75,10 +84,23 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) { type CDRS struct{} -func New(s engine.CdrStorage, m *mediator.Mediator, c *config.CGRConfig) *CDRS { +func New(s engine.CdrStorage, m *mediator.Mediator, st *engine.Stats, c *config.CGRConfig) *CDRS { storage = s medi = m cfg = c + stats = st + if cfg.CDRSStats != "" { + if cfg.CDRSStats != utils.INTERNAL { + if s, err := engine.NewProxyStats(cfg.CDRSStats); err == nil { + stats = s + } else { + engine.Logger.Err(fmt.Sprintf("Errors connecting to CDRS stats service : %s", err.Error())) + } + } + } else { + // disable stats for cdrs + stats = nil + } return &CDRS{} } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index c91c0bce3..da8530dab 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -67,6 +67,7 @@ var ( server = &engine.Server{} scribeServer history.Scribe cdrServer *cdrs.CDRS + cdrStats *engine.Stats sm sessionmanager.SessionManager medi *mediator.Mediator cfg *config.CGRConfig @@ -111,7 +112,7 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD connector = &engine.RPCClientConnector{Client: client} } var err error - medi, err = mediator.NewMediator(connector, loggerDb, cdrDb, cfg) + medi, err = mediator.NewMediator(connector, loggerDb, cdrDb, cdrStats, cfg) if err != nil { engine.Logger.Crit(fmt.Sprintf("Mediator config parsing error: %v", err)) exitChan <- true @@ -185,7 +186,7 @@ func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage, mediChan, d return } } - cdrServer = cdrs.New(cdrDb, medi, cfg) + cdrServer = cdrs.New(cdrDb, medi, cdrStats, cfg) cdrServer.RegisterHanlersToServer(server) close(doneChan) } @@ -439,6 +440,11 @@ func main() { go startMediator(responder, logDb, cdrDb, cacheChan, medChan) } + if cfg.CDRStatsEnabled { + cdrStats = &engine.Stats{} + server.RpcRegister(cdrStats) + } + var cdrsChan chan struct{} if cfg.CDRSEnabled { engine.Logger.Info("Starting CGRateS CDRS service.") diff --git a/config/config.go b/config/config.go index 007e87aa9..7e0cf853e 100644 --- a/config/config.go +++ b/config/config.go @@ -91,6 +91,8 @@ type CGRConfig struct { CDRSEnabled bool // Enable CDR Server service CDRSExtraFields []*utils.RSRField // Extra fields to store in CDRs CDRSMediator string // Address where to reach the Mediator. Empty for disabling mediation. <""|internal> + CDRSStats string // Address where to reach the Mediator. <""|intenal> + CDRStatsEnabled bool // Enable CDR Stats service //CdrStats []*cdrstats.CdrStats // Active cdr stats configuration instances CdreDefaultInstance *CdreConfig // Will be used in the case no specific one selected by API CdrcEnabled bool // Enable CDR client functionality @@ -111,6 +113,7 @@ type CGRConfig struct { SMMinCallDuration time.Duration // Only authorize calls with allowed duration bigger than this MediatorEnabled bool // Starts Mediator service: . MediatorRater string // Address where to reach the Rater: + MediatorStats string // Address where to reach the stats service: MediatorRaterReconnects int // Number of reconnects to rater before giving up. DerivedChargers utils.DerivedChargers // System wide derived chargers, added to the account level ones CombinedDerivedChargers bool // Combine accounts specific derived_chargers with server configured @@ -165,6 +168,8 @@ func (self *CGRConfig) setDefaults() error { self.CDRSEnabled = false self.CDRSExtraFields = []*utils.RSRField{} self.CDRSMediator = "" + self.CDRSStats = "" + self.CDRStatsEnabled = false self.CdreDefaultInstance, _ = NewDefaultCdreConfig() self.CdrcEnabled = false self.CdrcCdrs = utils.INTERNAL @@ -189,13 +194,14 @@ func (self *CGRConfig) setDefaults() error { utils.USAGE: &utils.RSRField{Id: "13"}, } self.MediatorEnabled = false - self.MediatorRater = "internal" + self.MediatorRater = utils.INTERNAL self.MediatorRaterReconnects = 3 + self.MediatorStats = utils.INTERNAL self.DerivedChargers = make(utils.DerivedChargers, 0) self.CombinedDerivedChargers = true self.SMEnabled = false self.SMSwitchType = FS - self.SMRater = "internal" + self.SMRater = utils.INTERNAL self.SMRaterReconnects = 3 self.SMDebitInterval = 10 self.SMMaxCallDuration = time.Duration(3) * time.Hour @@ -205,7 +211,7 @@ func (self *CGRConfig) setDefaults() error { self.FreeswitchReconnects = 5 self.HistoryAgentEnabled = false self.HistoryServerEnabled = false - self.HistoryServer = "internal" + self.HistoryServer = utils.INTERNAL self.HistoryDir = "/var/log/cgrates/history" self.HistorySaveInterval = time.Duration(1) * time.Second self.MailerServer = "localhost:25" @@ -397,6 +403,12 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) { if hasOpt = c.HasOption("cdrs", "mediator"); hasOpt { cfg.CDRSMediator, _ = c.GetString("cdrs", "mediator") } + if hasOpt = c.HasOption("cdrs", "stats"); hasOpt { + cfg.CDRSStats, _ = c.GetString("cdrs", "stats") + } + if hasOpt = c.HasOption("stats", "enabled"); hasOpt { + cfg.CDRStatsEnabled, _ = c.GetBool("stats", "enabled") + } if hasOpt = c.HasOption("cdre", "cdr_format"); hasOpt { cfg.CdreDefaultInstance.CdrFormat, _ = c.GetString("cdre", "cdr_format") } @@ -494,6 +506,9 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) { if hasOpt = c.HasOption("mediator", "rater_reconnects"); hasOpt { cfg.MediatorRaterReconnects, _ = c.GetInt("mediator", "rater_reconnects") } + if hasOpt = c.HasOption("mediator", "stats"); hasOpt { + cfg.MediatorStats, _ = c.GetString("mediator", "stats") + } if hasOpt = c.HasOption("session_manager", "enabled"); hasOpt { cfg.SMEnabled, _ = c.GetBool("session_manager", "enabled") } diff --git a/config/config_test.go b/config/config_test.go index 16e8d0736..4150f6720 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -83,6 +83,7 @@ func TestDefaults(t *testing.T) { eCfg.CDRSEnabled = false eCfg.CDRSExtraFields = []*utils.RSRField{} eCfg.CDRSMediator = "" + eCfg.CDRStatsEnabled = false eCfg.CdrcEnabled = false eCfg.CdrcCdrs = utils.INTERNAL eCfg.CdrcRunDelay = time.Duration(0) @@ -106,11 +107,12 @@ func TestDefaults(t *testing.T) { utils.USAGE: &utils.RSRField{Id: "13"}, } eCfg.MediatorEnabled = false - eCfg.MediatorRater = "internal" + eCfg.MediatorRater = utils.INTERNAL eCfg.MediatorRaterReconnects = 3 + eCfg.MediatorStats = utils.INTERNAL eCfg.SMEnabled = false eCfg.SMSwitchType = FS - eCfg.SMRater = "internal" + eCfg.SMRater = utils.INTERNAL eCfg.SMRaterReconnects = 3 eCfg.SMDebitInterval = 10 eCfg.SMMinCallDuration = time.Duration(0) @@ -121,7 +123,7 @@ func TestDefaults(t *testing.T) { eCfg.DerivedChargers = make(utils.DerivedChargers, 0) eCfg.CombinedDerivedChargers = true eCfg.HistoryAgentEnabled = false - eCfg.HistoryServer = "internal" + eCfg.HistoryServer = utils.INTERNAL eCfg.HistoryServerEnabled = false eCfg.HistoryDir = "/var/log/cgrates/history" eCfg.HistorySaveInterval = time.Duration(1) * time.Second @@ -206,6 +208,8 @@ func TestConfigFromFile(t *testing.T) { eCfg.CDRSEnabled = true eCfg.CDRSExtraFields = []*utils.RSRField{&utils.RSRField{Id: "test"}} eCfg.CDRSMediator = "test" + eCfg.CDRStatsEnabled = true + eCfg.CDRSStats = "test" eCfg.CdreDefaultInstance = &CdreConfig{ CdrFormat: "test", FieldSeparator: utils.CSV_SEP, @@ -243,6 +247,7 @@ func TestConfigFromFile(t *testing.T) { eCfg.MediatorEnabled = true eCfg.MediatorRater = "test" eCfg.MediatorRaterReconnects = 99 + eCfg.MediatorStats = "test" eCfg.SMEnabled = true eCfg.SMSwitchType = "test" eCfg.SMRater = "test" diff --git a/config/test_data.txt b/config/test_data.txt index 9cd147771..f45b32c2e 100644 --- a/config/test_data.txt +++ b/config/test_data.txt @@ -45,6 +45,10 @@ enabled = true # Starts Scheduler service: . enabled = true # Start the CDR Server service: . extra_fields = test # Extra fields to scategorye in CDRs mediator = test # Address where to reach the Mediacategory. Empty for disabling mediation. <""|internal> +stats = test # Address where to reach the stats sevre. Empty for disabling stats. <""|internal> + +[stats] +enabled = true # Start the CDR stats service: . [cdre] cdr_format = test # Exported CDRs format @@ -83,7 +87,8 @@ extra_fields = test:test # Field identifiers of the fields to add in extra field [mediator] enabled = true # Starts Mediacategory service: . rater = test # Address where to reach the Rater: -rater_reconnects = 99 # Number of reconnects to rater before giving up. +rater_reconnects = 99 # Number of reconnects to rater before giving up. +stats = test # Address where to reach the stats service: [session_manager] enabled = true # Starts SessionManager service: . diff --git a/engine/stats.go b/engine/stats.go index 8cca944b3..81e808fac 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -20,35 +20,72 @@ package engine import ( "errors" + "net/rpc" "sync" "github.com/cgrates/cgrates/utils" ) +type StatsInterface interface { + AddQueue(*StatsQueue, *int) error + GetValues(string, *map[string]float64) error + AppendCDR(*utils.StoredCdr, *int) error +} + type Stats struct { queues map[string]*StatsQueue mux sync.RWMutex } -func (s *Stats) AddQueue(sq *StatsQueue) { +func (s *Stats) AddQueue(sq *StatsQueue, out *int) error { s.mux.Lock() defer s.mux.Unlock() s.queues[sq.conf.Id] = sq + *out = 0 + return nil } -func (s *Stats) GetValues(sqID string) (map[string]float64, error) { +func (s *Stats) GetValues(sqID string, values *map[string]float64) error { s.mux.RLock() defer s.mux.RUnlock() if sq, ok := s.queues[sqID]; ok { - return sq.GetStats(), nil + *values = sq.GetStats() + return nil } - return nil, errors.New("Not Found") + return errors.New("Not Found") } -func (s *Stats) AppendCDR(cdr *utils.StoredCdr) { +func (s *Stats) AppendCDR(cdr *utils.StoredCdr, out *int) error { s.mux.RLock() defer s.mux.RUnlock() for _, sq := range s.queues { sq.AppendCDR(cdr) } + *out = 0 + return nil +} + +type ProxyStats struct { + Client *rpc.Client +} + +func NewProxyStats(addr string) (*ProxyStats, error) { + client, err := rpc.Dial("tcp", addr) + + if err != nil { + return nil, err + } + return &ProxyStats{Client: client}, nil +} + +func (ps *ProxyStats) AddQueue(sq *StatsQueue, out *int) error { + return ps.Client.Call("Scribe.AddQueue", sq, out) +} + +func (ps *ProxyStats) GetValues(sqID string, values *map[string]float64) error { + return ps.Client.Call("Scribe.GetValues", sqID, values) +} + +func (ps *ProxyStats) AppendCDR(cdr *utils.StoredCdr, out *int) error { + return ps.Client.Call("Scribe.AppendCDR", cdr, out) } diff --git a/general_tests/fsevcorelate_test.go b/general_tests/fsevcorelate_test.go index addee97b6..bccdcba41 100644 --- a/general_tests/fsevcorelate_test.go +++ b/general_tests/fsevcorelate_test.go @@ -215,7 +215,7 @@ var jsonCdr = []byte(`{"core-uuid":"feef0b51-7fdf-4c4a-878e-aff233752de2","chann func TestEvCorelate(t *testing.T) { cfg, _ := config.NewDefaultCGRConfig() - cdrs.New(nil, nil, cfg) // So we can set the package cfg + cdrs.New(nil, nil, nil, cfg) // So we can set the package cfg answerEv := new(sessionmanager.FSEvent).New(answerEvent) if answerEv.GetName() != "CHANNEL_ANSWER" { t.Error("Event not parsed correctly: ", answerEv) diff --git a/mediator/mediator.go b/mediator/mediator.go index 14757881d..6b66e388e 100644 --- a/mediator/mediator.go +++ b/mediator/mediator.go @@ -28,13 +28,26 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewMediator(connector engine.Connector, logDb engine.LogStorage, cdrDb engine.CdrStorage, cfg *config.CGRConfig) (m *Mediator, err error) { +func NewMediator(connector engine.Connector, logDb engine.LogStorage, cdrDb engine.CdrStorage, st engine.StatsInterface, cfg *config.CGRConfig) (m *Mediator, err error) { m = &Mediator{ connector: connector, logDb: logDb, cdrDb: cdrDb, + stats: st, cgrCfg: cfg, } + if m.cgrCfg.MediatorStats != "" { + if m.cgrCfg.MediatorStats != utils.INTERNAL { + if s, err := engine.NewProxyStats(m.cgrCfg.MediatorStats); err == nil { + m.stats = s + } else { + engine.Logger.Err(fmt.Sprintf("Errors connecting to CDRS stats service (mediator): %s", err.Error())) + } + } + } else { + // disable stats for mediator + m.stats = nil + } return m, nil } @@ -42,6 +55,7 @@ type Mediator struct { connector engine.Connector logDb engine.LogStorage cdrDb engine.CdrStorage + stats engine.StatsInterface cgrCfg *config.CGRConfig } @@ -155,6 +169,14 @@ func (self *Mediator) RateCdr(storedCdr *utils.StoredCdr) error { if err := self.rateCDR(cdr); err != nil { extraInfo = err.Error() } + if self.stats != nil { + go func() { + var x int = 0 // not used + if err := self.stats.AppendCDR(cdr, &x); err != nil { + engine.Logger.Err(fmt.Sprintf("Could not append cdr to stats (mediator): %s", err.Error())) + } + }() + } if err := self.cdrDb.SetRatedCdr(cdr, extraInfo); err != nil { engine.Logger.Err(fmt.Sprintf(" Could not record cost for cgrid: <%s>, ERROR: <%s>, cost: %f, extraInfo: %s", cdr.CgrId, err.Error(), cdr.Cost, extraInfo))