StatS integrated in engine start routine

This commit is contained in:
DanB
2017-08-02 13:51:44 +02:00
parent adb7864998
commit 2336ff85d4
10 changed files with 96 additions and 28 deletions

View File

@@ -39,6 +39,7 @@ import (
"github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/sessionmanager"
"github.com/cgrates/cgrates/stats"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
@@ -545,6 +546,25 @@ func startResourceLimiterService(internalRLSChan, internalCdrStatSChan chan rpcc
internalRLSChan <- rlsV1
}
// startStatService fires up the StatS
func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
dataDB engine.DataDB, ms engine.Marshaler, server *utils.Server, exitChan chan bool) {
sts, err := stats.NewStatService(dataDB, ms, cfg.StatSCfg().StoreInterval)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not init, error: %s", err.Error()))
exitChan <- true
return
}
utils.Logger.Info(fmt.Sprintf("Starting Stat service"))
go func() {
if err := sts.ListenAndServe(exitChan); err != nil {
utils.Logger.Crit(fmt.Sprintf("<StatS> Error: %s listening for packets", err.Error()))
exitChan <- true
return
}
}()
}
func startRpc(server *utils.Server, internalRaterChan,
internalCdrSChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan,
internalAliaseSChan, internalRLsChan chan rpcclient.RpcClientConnection, internalSMGChan chan *sessionmanager.SMGeneric) {
@@ -656,6 +676,17 @@ func main() {
// Init cache
cache.NewCache(cfg.CacheConfig)
var ms engine.Marshaler
if ms, err = engine.NewMarshaler(cfg.DBDataEncoding); err != nil {
log.Fatalf("error initializing marshaler: ", err)
return
}
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not start, error: %s", err.Error()))
exitChan <- true
return
}
var dataDB engine.DataDB
var loadDb engine.LoadStorage
var cdrDb engine.CdrStorage
@@ -709,6 +740,7 @@ func main() {
internalAliaseSChan := make(chan rpcclient.RpcClientConnection, 1)
internalSMGChan := make(chan *sessionmanager.SMGeneric, 1)
internalRLSChan := make(chan rpcclient.RpcClientConnection, 1)
internalStatSChan := make(chan rpcclient.RpcClientConnection, 1)
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, dataDB, exitChan, cacheDoneChan)
@@ -802,6 +834,10 @@ func main() {
go startResourceLimiterService(internalRLSChan, internalCdrStatSChan, cfg, dataDB, server, exitChan)
}
if cfg.StatSCfg().Enabled {
go startStatService(internalStatSChan, cfg, dataDB, ms, server, exitChan)
}
// Serve rpc connections
go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan,
internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRLSChan, internalSMGChan)

View File

@@ -625,6 +625,11 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
return err
}
jsnStatSCfg, err := jsnCfg.StatSJsonCfg()
if err != nil {
return err
}
jsnMailerCfg, err := jsnCfg.MailerJsonCfg()
if err != nil {
return err
@@ -1052,6 +1057,15 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
}
}
if jsnStatSCfg != nil {
if self.statsCfg == nil {
self.statsCfg = new(StatSCfg)
}
if self.statsCfg.loadFromJsonCfg(jsnStatSCfg); err != nil {
return err
}
}
if jsnUserServCfg != nil {
if jsnUserServCfg.Enabled != nil {
self.UserServerEnabled = *jsnUserServCfg.Enabled
@@ -1110,6 +1124,7 @@ func (self *CGRConfig) ResourceLimiterCfg() *ResourceLimiterConfig {
return self.resourceLimiterCfg
}
// ToDo: fix locking
func (cfg *CGRConfig) StatSCfg() *StatSCfg {
return cfg.statsCfg
}

View File

@@ -405,13 +405,13 @@ const CGRATES_CFG_JSON = `
"rls": {
"enabled": false, // starts ResourceLimiter service: <true|false>.
"cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
"cache_dump_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur>
"store_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur>
},
"stats": {
"enabled": false, // starts ResourceLimiter service: <true|false>.
"cache_dump_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur>
"enabled": false, // starts Stat service: <true|false>.
"store_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur>
},

View File

@@ -665,9 +665,9 @@ func TestDfUserServJsonCfg(t *testing.T) {
func TestDfResourceLimiterSJsonCfg(t *testing.T) {
eCfg := &ResourceLimiterServJsonCfg{
Enabled: utils.BoolPointer(false),
Cdrstats_conns: &[]*HaPoolJsonCfg{},
Cache_dump_interval: utils.StringPointer("0s"),
Enabled: utils.BoolPointer(false),
Cdrstats_conns: &[]*HaPoolJsonCfg{},
Store_interval: utils.StringPointer("0s"),
}
if cfg, err := dfCgrJsonCfg.ResourceLimiterJsonCfg(); err != nil {
t.Error(err)
@@ -678,8 +678,8 @@ func TestDfResourceLimiterSJsonCfg(t *testing.T) {
func TestDfStatServiceJsonCfg(t *testing.T) {
eCfg := &StatServJsonCfg{
Enabled: utils.BoolPointer(false),
Cache_dump_interval: utils.StringPointer("0s"),
Enabled: utils.BoolPointer(false),
Store_interval: utils.StringPointer("0s"),
}
if cfg, err := dfCgrJsonCfg.StatSJsonCfg(); err != nil {
t.Error(err)

View File

@@ -553,9 +553,9 @@ func TestCgrCfgJSONDefaultsUserS(t *testing.T) {
func TestCgrCfgJSONDefaultsResLimCfg(t *testing.T) {
eResLiCfg := &ResourceLimiterConfig{
Enabled: false,
CDRStatConns: []*HaPoolConfig{},
CacheDumpInterval: 0 * time.Second,
Enabled: false,
CDRStatConns: []*HaPoolConfig{},
StoreInterval: 0,
}
if !reflect.DeepEqual(cgrCfg.resourceLimiterCfg, eResLiCfg) {

View File

@@ -378,15 +378,15 @@ type UserServJsonCfg struct {
// ResourceLimiter service config section
type ResourceLimiterServJsonCfg struct {
Enabled *bool
Cdrstats_conns *[]*HaPoolJsonCfg
Cache_dump_interval *string
Enabled *bool
Cdrstats_conns *[]*HaPoolJsonCfg
Store_interval *string
}
// Stat service config section
type StatServJsonCfg struct {
Enabled *bool
Cache_dump_interval *string
Enabled *bool
Store_interval *string
}
// Mailer config section

View File

@@ -24,9 +24,9 @@ import (
)
type ResourceLimiterConfig struct {
Enabled bool
CDRStatConns []*HaPoolConfig // Connections towards CDRStatS
CacheDumpInterval time.Duration // Dump regularly from cache into dataDB
Enabled bool
CDRStatConns []*HaPoolConfig // Connections towards CDRStatS
StoreInterval time.Duration // Dump regularly from cache into dataDB
}
func (rlcfg *ResourceLimiterConfig) loadFromJsonCfg(jsnCfg *ResourceLimiterServJsonCfg) (err error) {
@@ -43,8 +43,8 @@ func (rlcfg *ResourceLimiterConfig) loadFromJsonCfg(jsnCfg *ResourceLimiterServJ
rlcfg.CDRStatConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Cache_dump_interval != nil {
if rlcfg.CacheDumpInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Cache_dump_interval); err != nil {
if jsnCfg.Store_interval != nil {
if rlcfg.StoreInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Store_interval); err != nil {
return err
}
}

View File

@@ -24,8 +24,8 @@ import (
)
type StatSCfg struct {
Enabled bool
CacheDumpInterval time.Duration // Dump regularly from cache into dataDB
Enabled bool
StoreInterval time.Duration // Dump regularly from cache into dataDB
}
func (st *StatSCfg) loadFromJsonCfg(jsnCfg *StatServJsonCfg) (err error) {
@@ -35,8 +35,8 @@ func (st *StatSCfg) loadFromJsonCfg(jsnCfg *StatServJsonCfg) (err error) {
if jsnCfg.Enabled != nil {
st.Enabled = *jsnCfg.Enabled
}
if jsnCfg.Cache_dump_interval != nil {
if st.CacheDumpInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Cache_dump_interval); err != nil {
if jsnCfg.Store_interval != nil {
if st.StoreInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Store_interval); err != nil {
return err
}
}

View File

@@ -21,6 +21,7 @@ import (
"bytes"
"encoding/gob"
"encoding/json"
"fmt"
"reflect"
"github.com/cgrates/cgrates/utils"
@@ -187,6 +188,19 @@ type LoadWriter interface {
SetTPStats([]*utils.TPStats) error
}
// NewMarshaler returns the marshaler type selected by mrshlerStr
func NewMarshaler(mrshlerStr string) (ms Marshaler, err error) {
switch mrshlerStr {
case utils.MSGPACK:
ms = NewCodecMsgpackMarshaler()
case utils.JSON:
ms = new(JSONMarshaler)
default:
err = fmt.Errorf("Unsupported marshaler: %v", mrshlerStr)
}
return
}
type Marshaler interface {
Marshal(v interface{}) ([]byte, error)
Unmarshal(data []byte, v interface{}) error

View File

@@ -72,13 +72,16 @@ type StatService struct {
stInsts StatsInstances // ordered list of StatsQueues
}
// Called to start the service
func (ss *StatService) ListenAndServe() error {
// ListenAndServe loops keeps the service alive
func (ss *StatService) ListenAndServe(exitChan chan bool) error {
e := <-exitChan
exitChan <- e // put back for the others listening for shutdown request
return nil
}
// Called to shutdown the service
func (ss *StatService) ServiceShutdown() error {
// ToDo: improve with context, ie, following http implementation
func (ss *StatService) Shutdown() error {
close(ss.stopStoring)
ss.storeMetrics()
return nil