cdrs stats RPC service and integration with cdrs/mediator

This commit is contained in:
Radu Ioan Fericean
2014-07-21 20:00:58 +03:00
parent 1289adc155
commit 8ac8b65a95
8 changed files with 129 additions and 17 deletions

View File

@@ -33,10 +33,19 @@ var (
cfg *config.CGRConfig // Share the configuration with the rest of the package
storage engine.CdrStorage
medi *mediator.Mediator
stats engine.StatsInterface
)
// Returns error if not able to properly store the CDR, mediation is async since we can always recover offline
func storeAndMediate(storedCdr *utils.StoredCdr) error {
if stats != nil {
go func() {
var x int = 0 // not used
if err := stats.AppendCDR(storedCdr, &x); err != nil {
engine.Logger.Err(fmt.Sprintf("Could not append cdr to stats: %s", err.Error()))
}
}()
}
if err := storage.SetCdr(storedCdr); err != nil {
return err
}
@@ -75,10 +84,23 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
type CDRS struct{}
func New(s engine.CdrStorage, m *mediator.Mediator, c *config.CGRConfig) *CDRS {
func New(s engine.CdrStorage, m *mediator.Mediator, st *engine.Stats, c *config.CGRConfig) *CDRS {
storage = s
medi = m
cfg = c
stats = st
if cfg.CDRSStats != "" {
if cfg.CDRSStats != utils.INTERNAL {
if s, err := engine.NewProxyStats(cfg.CDRSStats); err == nil {
stats = s
} else {
engine.Logger.Err(fmt.Sprintf("Errors connecting to CDRS stats service : %s", err.Error()))
}
}
} else {
// disable stats for cdrs
stats = nil
}
return &CDRS{}
}

View File

@@ -67,6 +67,7 @@ var (
server = &engine.Server{}
scribeServer history.Scribe
cdrServer *cdrs.CDRS
cdrStats *engine.Stats
sm sessionmanager.SessionManager
medi *mediator.Mediator
cfg *config.CGRConfig
@@ -111,7 +112,7 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD
connector = &engine.RPCClientConnector{Client: client}
}
var err error
medi, err = mediator.NewMediator(connector, loggerDb, cdrDb, cfg)
medi, err = mediator.NewMediator(connector, loggerDb, cdrDb, cdrStats, cfg)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("Mediator config parsing error: %v", err))
exitChan <- true
@@ -185,7 +186,7 @@ func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage, mediChan, d
return
}
}
cdrServer = cdrs.New(cdrDb, medi, cfg)
cdrServer = cdrs.New(cdrDb, medi, cdrStats, cfg)
cdrServer.RegisterHanlersToServer(server)
close(doneChan)
}
@@ -439,6 +440,11 @@ func main() {
go startMediator(responder, logDb, cdrDb, cacheChan, medChan)
}
if cfg.CDRStatsEnabled {
cdrStats = &engine.Stats{}
server.RpcRegister(cdrStats)
}
var cdrsChan chan struct{}
if cfg.CDRSEnabled {
engine.Logger.Info("Starting CGRateS CDRS service.")

View File

@@ -91,6 +91,8 @@ type CGRConfig struct {
CDRSEnabled bool // Enable CDR Server service
CDRSExtraFields []*utils.RSRField // Extra fields to store in CDRs
CDRSMediator string // Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
CDRSStats string // Address where to reach the Mediator. <""|intenal>
CDRStatsEnabled bool // Enable CDR Stats service
//CdrStats []*cdrstats.CdrStats // Active cdr stats configuration instances
CdreDefaultInstance *CdreConfig // Will be used in the case no specific one selected by API
CdrcEnabled bool // Enable CDR client functionality
@@ -111,6 +113,7 @@ type CGRConfig struct {
SMMinCallDuration time.Duration // Only authorize calls with allowed duration bigger than this
MediatorEnabled bool // Starts Mediator service: <true|false>.
MediatorRater string // Address where to reach the Rater: <internal|x.y.z.y:1234>
MediatorStats string // Address where to reach the stats service: <internal|x.y.z.y:1234>
MediatorRaterReconnects int // Number of reconnects to rater before giving up.
DerivedChargers utils.DerivedChargers // System wide derived chargers, added to the account level ones
CombinedDerivedChargers bool // Combine accounts specific derived_chargers with server configured
@@ -165,6 +168,8 @@ func (self *CGRConfig) setDefaults() error {
self.CDRSEnabled = false
self.CDRSExtraFields = []*utils.RSRField{}
self.CDRSMediator = ""
self.CDRSStats = ""
self.CDRStatsEnabled = false
self.CdreDefaultInstance, _ = NewDefaultCdreConfig()
self.CdrcEnabled = false
self.CdrcCdrs = utils.INTERNAL
@@ -189,13 +194,14 @@ func (self *CGRConfig) setDefaults() error {
utils.USAGE: &utils.RSRField{Id: "13"},
}
self.MediatorEnabled = false
self.MediatorRater = "internal"
self.MediatorRater = utils.INTERNAL
self.MediatorRaterReconnects = 3
self.MediatorStats = utils.INTERNAL
self.DerivedChargers = make(utils.DerivedChargers, 0)
self.CombinedDerivedChargers = true
self.SMEnabled = false
self.SMSwitchType = FS
self.SMRater = "internal"
self.SMRater = utils.INTERNAL
self.SMRaterReconnects = 3
self.SMDebitInterval = 10
self.SMMaxCallDuration = time.Duration(3) * time.Hour
@@ -205,7 +211,7 @@ func (self *CGRConfig) setDefaults() error {
self.FreeswitchReconnects = 5
self.HistoryAgentEnabled = false
self.HistoryServerEnabled = false
self.HistoryServer = "internal"
self.HistoryServer = utils.INTERNAL
self.HistoryDir = "/var/log/cgrates/history"
self.HistorySaveInterval = time.Duration(1) * time.Second
self.MailerServer = "localhost:25"
@@ -397,6 +403,12 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
if hasOpt = c.HasOption("cdrs", "mediator"); hasOpt {
cfg.CDRSMediator, _ = c.GetString("cdrs", "mediator")
}
if hasOpt = c.HasOption("cdrs", "stats"); hasOpt {
cfg.CDRSStats, _ = c.GetString("cdrs", "stats")
}
if hasOpt = c.HasOption("stats", "enabled"); hasOpt {
cfg.CDRStatsEnabled, _ = c.GetBool("stats", "enabled")
}
if hasOpt = c.HasOption("cdre", "cdr_format"); hasOpt {
cfg.CdreDefaultInstance.CdrFormat, _ = c.GetString("cdre", "cdr_format")
}
@@ -494,6 +506,9 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
if hasOpt = c.HasOption("mediator", "rater_reconnects"); hasOpt {
cfg.MediatorRaterReconnects, _ = c.GetInt("mediator", "rater_reconnects")
}
if hasOpt = c.HasOption("mediator", "stats"); hasOpt {
cfg.MediatorStats, _ = c.GetString("mediator", "stats")
}
if hasOpt = c.HasOption("session_manager", "enabled"); hasOpt {
cfg.SMEnabled, _ = c.GetBool("session_manager", "enabled")
}

View File

@@ -83,6 +83,7 @@ func TestDefaults(t *testing.T) {
eCfg.CDRSEnabled = false
eCfg.CDRSExtraFields = []*utils.RSRField{}
eCfg.CDRSMediator = ""
eCfg.CDRStatsEnabled = false
eCfg.CdrcEnabled = false
eCfg.CdrcCdrs = utils.INTERNAL
eCfg.CdrcRunDelay = time.Duration(0)
@@ -106,11 +107,12 @@ func TestDefaults(t *testing.T) {
utils.USAGE: &utils.RSRField{Id: "13"},
}
eCfg.MediatorEnabled = false
eCfg.MediatorRater = "internal"
eCfg.MediatorRater = utils.INTERNAL
eCfg.MediatorRaterReconnects = 3
eCfg.MediatorStats = utils.INTERNAL
eCfg.SMEnabled = false
eCfg.SMSwitchType = FS
eCfg.SMRater = "internal"
eCfg.SMRater = utils.INTERNAL
eCfg.SMRaterReconnects = 3
eCfg.SMDebitInterval = 10
eCfg.SMMinCallDuration = time.Duration(0)
@@ -121,7 +123,7 @@ func TestDefaults(t *testing.T) {
eCfg.DerivedChargers = make(utils.DerivedChargers, 0)
eCfg.CombinedDerivedChargers = true
eCfg.HistoryAgentEnabled = false
eCfg.HistoryServer = "internal"
eCfg.HistoryServer = utils.INTERNAL
eCfg.HistoryServerEnabled = false
eCfg.HistoryDir = "/var/log/cgrates/history"
eCfg.HistorySaveInterval = time.Duration(1) * time.Second
@@ -206,6 +208,8 @@ func TestConfigFromFile(t *testing.T) {
eCfg.CDRSEnabled = true
eCfg.CDRSExtraFields = []*utils.RSRField{&utils.RSRField{Id: "test"}}
eCfg.CDRSMediator = "test"
eCfg.CDRStatsEnabled = true
eCfg.CDRSStats = "test"
eCfg.CdreDefaultInstance = &CdreConfig{
CdrFormat: "test",
FieldSeparator: utils.CSV_SEP,
@@ -243,6 +247,7 @@ func TestConfigFromFile(t *testing.T) {
eCfg.MediatorEnabled = true
eCfg.MediatorRater = "test"
eCfg.MediatorRaterReconnects = 99
eCfg.MediatorStats = "test"
eCfg.SMEnabled = true
eCfg.SMSwitchType = "test"
eCfg.SMRater = "test"

View File

@@ -45,6 +45,10 @@ enabled = true # Starts Scheduler service: <true|false>.
enabled = true # Start the CDR Server service: <true|false>.
extra_fields = test # Extra fields to scategorye in CDRs
mediator = test # Address where to reach the Mediacategory. Empty for disabling mediation. <""|internal>
stats = test # Address where to reach the stats sevre. Empty for disabling stats. <""|internal>
[stats]
enabled = true # Start the CDR stats service: <true|false>.
[cdre]
cdr_format = test # Exported CDRs format <csv>
@@ -83,7 +87,8 @@ extra_fields = test:test # Field identifiers of the fields to add in extra field
[mediator]
enabled = true # Starts Mediacategory service: <true|false>.
rater = test # Address where to reach the Rater: <internal|x.y.z.y:1234>
rater_reconnects = 99 # Number of reconnects to rater before giving up.
rater_reconnects = 99 # Number of reconnects to rater before giving up.
stats = test # Address where to reach the stats service: <internal|x.y.z.y:1234>
[session_manager]
enabled = true # Starts SessionManager service: <true|false>.

View File

@@ -20,35 +20,72 @@ package engine
import (
"errors"
"net/rpc"
"sync"
"github.com/cgrates/cgrates/utils"
)
type StatsInterface interface {
AddQueue(*StatsQueue, *int) error
GetValues(string, *map[string]float64) error
AppendCDR(*utils.StoredCdr, *int) error
}
type Stats struct {
queues map[string]*StatsQueue
mux sync.RWMutex
}
func (s *Stats) AddQueue(sq *StatsQueue) {
func (s *Stats) AddQueue(sq *StatsQueue, out *int) error {
s.mux.Lock()
defer s.mux.Unlock()
s.queues[sq.conf.Id] = sq
*out = 0
return nil
}
func (s *Stats) GetValues(sqID string) (map[string]float64, error) {
func (s *Stats) GetValues(sqID string, values *map[string]float64) error {
s.mux.RLock()
defer s.mux.RUnlock()
if sq, ok := s.queues[sqID]; ok {
return sq.GetStats(), nil
*values = sq.GetStats()
return nil
}
return nil, errors.New("Not Found")
return errors.New("Not Found")
}
func (s *Stats) AppendCDR(cdr *utils.StoredCdr) {
func (s *Stats) AppendCDR(cdr *utils.StoredCdr, out *int) error {
s.mux.RLock()
defer s.mux.RUnlock()
for _, sq := range s.queues {
sq.AppendCDR(cdr)
}
*out = 0
return nil
}
type ProxyStats struct {
Client *rpc.Client
}
func NewProxyStats(addr string) (*ProxyStats, error) {
client, err := rpc.Dial("tcp", addr)
if err != nil {
return nil, err
}
return &ProxyStats{Client: client}, nil
}
func (ps *ProxyStats) AddQueue(sq *StatsQueue, out *int) error {
return ps.Client.Call("Scribe.AddQueue", sq, out)
}
func (ps *ProxyStats) GetValues(sqID string, values *map[string]float64) error {
return ps.Client.Call("Scribe.GetValues", sqID, values)
}
func (ps *ProxyStats) AppendCDR(cdr *utils.StoredCdr, out *int) error {
return ps.Client.Call("Scribe.AppendCDR", cdr, out)
}

View File

@@ -215,7 +215,7 @@ var jsonCdr = []byte(`{"core-uuid":"feef0b51-7fdf-4c4a-878e-aff233752de2","chann
func TestEvCorelate(t *testing.T) {
cfg, _ := config.NewDefaultCGRConfig()
cdrs.New(nil, nil, cfg) // So we can set the package cfg
cdrs.New(nil, nil, nil, cfg) // So we can set the package cfg
answerEv := new(sessionmanager.FSEvent).New(answerEvent)
if answerEv.GetName() != "CHANNEL_ANSWER" {
t.Error("Event not parsed correctly: ", answerEv)

View File

@@ -28,13 +28,26 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewMediator(connector engine.Connector, logDb engine.LogStorage, cdrDb engine.CdrStorage, cfg *config.CGRConfig) (m *Mediator, err error) {
func NewMediator(connector engine.Connector, logDb engine.LogStorage, cdrDb engine.CdrStorage, st engine.StatsInterface, cfg *config.CGRConfig) (m *Mediator, err error) {
m = &Mediator{
connector: connector,
logDb: logDb,
cdrDb: cdrDb,
stats: st,
cgrCfg: cfg,
}
if m.cgrCfg.MediatorStats != "" {
if m.cgrCfg.MediatorStats != utils.INTERNAL {
if s, err := engine.NewProxyStats(m.cgrCfg.MediatorStats); err == nil {
m.stats = s
} else {
engine.Logger.Err(fmt.Sprintf("Errors connecting to CDRS stats service (mediator): %s", err.Error()))
}
}
} else {
// disable stats for mediator
m.stats = nil
}
return m, nil
}
@@ -42,6 +55,7 @@ type Mediator struct {
connector engine.Connector
logDb engine.LogStorage
cdrDb engine.CdrStorage
stats engine.StatsInterface
cgrCfg *config.CGRConfig
}
@@ -155,6 +169,14 @@ func (self *Mediator) RateCdr(storedCdr *utils.StoredCdr) error {
if err := self.rateCDR(cdr); err != nil {
extraInfo = err.Error()
}
if self.stats != nil {
go func() {
var x int = 0 // not used
if err := self.stats.AppendCDR(cdr, &x); err != nil {
engine.Logger.Err(fmt.Sprintf("Could not append cdr to stats (mediator): %s", err.Error()))
}
}()
}
if err := self.cdrDb.SetRatedCdr(cdr, extraInfo); err != nil {
engine.Logger.Err(fmt.Sprintf("<Mediator> Could not record cost for cgrid: <%s>, ERROR: <%s>, cost: %f, extraInfo: %s",
cdr.CgrId, err.Error(), cdr.Cost, extraInfo))