diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 1625f2ba8..a100dc30a 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -39,6 +39,7 @@ import ( "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/sessionmanager" + "github.com/cgrates/cgrates/stats" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) @@ -545,6 +546,25 @@ func startResourceLimiterService(internalRLSChan, internalCdrStatSChan chan rpcc internalRLSChan <- rlsV1 } +// startStatService fires up the StatS +func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, + dataDB engine.DataDB, ms engine.Marshaler, server *utils.Server, exitChan chan bool) { + sts, err := stats.NewStatService(dataDB, ms, cfg.StatSCfg().StoreInterval) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) + exitChan <- true + return + } + utils.Logger.Info(fmt.Sprintf("Starting Stat service")) + go func() { + if err := sts.ListenAndServe(exitChan); err != nil { + utils.Logger.Crit(fmt.Sprintf(" Error: %s listening for packets", err.Error())) + exitChan <- true + return + } + }() +} + func startRpc(server *utils.Server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRLsChan chan rpcclient.RpcClientConnection, internalSMGChan chan *sessionmanager.SMGeneric) { @@ -656,6 +676,17 @@ func main() { // Init cache cache.NewCache(cfg.CacheConfig) + var ms engine.Marshaler + if ms, err = engine.NewMarshaler(cfg.DBDataEncoding); err != nil { + log.Fatalf("error initializing marshaler: ", err) + return + } + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) + exitChan <- true + return + } + var dataDB engine.DataDB var loadDb engine.LoadStorage var cdrDb engine.CdrStorage @@ -709,6 +740,7 @@ func main() { internalAliaseSChan := make(chan rpcclient.RpcClientConnection, 1) internalSMGChan := make(chan *sessionmanager.SMGeneric, 1) internalRLSChan := make(chan rpcclient.RpcClientConnection, 1) + internalStatSChan := make(chan rpcclient.RpcClientConnection, 1) // Start ServiceManager srvManager := servmanager.NewServiceManager(cfg, dataDB, exitChan, cacheDoneChan) @@ -802,6 +834,10 @@ func main() { go startResourceLimiterService(internalRLSChan, internalCdrStatSChan, cfg, dataDB, server, exitChan) } + if cfg.StatSCfg().Enabled { + go startStatService(internalStatSChan, cfg, dataDB, ms, server, exitChan) + } + // Serve rpc connections go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRLSChan, internalSMGChan) diff --git a/config/config.go b/config/config.go index 734211720..71b93befb 100755 --- a/config/config.go +++ b/config/config.go @@ -625,6 +625,11 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { return err } + jsnStatSCfg, err := jsnCfg.StatSJsonCfg() + if err != nil { + return err + } + jsnMailerCfg, err := jsnCfg.MailerJsonCfg() if err != nil { return err @@ -1052,6 +1057,15 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { } } + if jsnStatSCfg != nil { + if self.statsCfg == nil { + self.statsCfg = new(StatSCfg) + } + if self.statsCfg.loadFromJsonCfg(jsnStatSCfg); err != nil { + return err + } + } + if jsnUserServCfg != nil { if jsnUserServCfg.Enabled != nil { self.UserServerEnabled = *jsnUserServCfg.Enabled @@ -1110,6 +1124,7 @@ func (self *CGRConfig) ResourceLimiterCfg() *ResourceLimiterConfig { return self.resourceLimiterCfg } +// ToDo: fix locking func (cfg *CGRConfig) StatSCfg() *StatSCfg { return cfg.statsCfg } diff --git a/config/config_defaults.go b/config/config_defaults.go index 7f07b6c54..4682a9354 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -405,13 +405,13 @@ const CGRATES_CFG_JSON = ` "rls": { "enabled": false, // starts ResourceLimiter service: . "cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> - "cache_dump_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur> + "store_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur> }, "stats": { - "enabled": false, // starts ResourceLimiter service: . - "cache_dump_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur> + "enabled": false, // starts Stat service: . + "store_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur> }, diff --git a/config/config_json_test.go b/config/config_json_test.go index bb50134a2..72da175e9 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -665,9 +665,9 @@ func TestDfUserServJsonCfg(t *testing.T) { func TestDfResourceLimiterSJsonCfg(t *testing.T) { eCfg := &ResourceLimiterServJsonCfg{ - Enabled: utils.BoolPointer(false), - Cdrstats_conns: &[]*HaPoolJsonCfg{}, - Cache_dump_interval: utils.StringPointer("0s"), + Enabled: utils.BoolPointer(false), + Cdrstats_conns: &[]*HaPoolJsonCfg{}, + Store_interval: utils.StringPointer("0s"), } if cfg, err := dfCgrJsonCfg.ResourceLimiterJsonCfg(); err != nil { t.Error(err) @@ -678,8 +678,8 @@ func TestDfResourceLimiterSJsonCfg(t *testing.T) { func TestDfStatServiceJsonCfg(t *testing.T) { eCfg := &StatServJsonCfg{ - Enabled: utils.BoolPointer(false), - Cache_dump_interval: utils.StringPointer("0s"), + Enabled: utils.BoolPointer(false), + Store_interval: utils.StringPointer("0s"), } if cfg, err := dfCgrJsonCfg.StatSJsonCfg(); err != nil { t.Error(err) diff --git a/config/config_test.go b/config/config_test.go index 4bffb4342..7f5c2f3fd 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -553,9 +553,9 @@ func TestCgrCfgJSONDefaultsUserS(t *testing.T) { func TestCgrCfgJSONDefaultsResLimCfg(t *testing.T) { eResLiCfg := &ResourceLimiterConfig{ - Enabled: false, - CDRStatConns: []*HaPoolConfig{}, - CacheDumpInterval: 0 * time.Second, + Enabled: false, + CDRStatConns: []*HaPoolConfig{}, + StoreInterval: 0, } if !reflect.DeepEqual(cgrCfg.resourceLimiterCfg, eResLiCfg) { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 6af793245..e546c524b 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -378,15 +378,15 @@ type UserServJsonCfg struct { // ResourceLimiter service config section type ResourceLimiterServJsonCfg struct { - Enabled *bool - Cdrstats_conns *[]*HaPoolJsonCfg - Cache_dump_interval *string + Enabled *bool + Cdrstats_conns *[]*HaPoolJsonCfg + Store_interval *string } // Stat service config section type StatServJsonCfg struct { - Enabled *bool - Cache_dump_interval *string + Enabled *bool + Store_interval *string } // Mailer config section diff --git a/config/reslimitercfg.go b/config/reslimitercfg.go index 115b6cf92..2a69575f4 100644 --- a/config/reslimitercfg.go +++ b/config/reslimitercfg.go @@ -24,9 +24,9 @@ import ( ) type ResourceLimiterConfig struct { - Enabled bool - CDRStatConns []*HaPoolConfig // Connections towards CDRStatS - CacheDumpInterval time.Duration // Dump regularly from cache into dataDB + Enabled bool + CDRStatConns []*HaPoolConfig // Connections towards CDRStatS + StoreInterval time.Duration // Dump regularly from cache into dataDB } func (rlcfg *ResourceLimiterConfig) loadFromJsonCfg(jsnCfg *ResourceLimiterServJsonCfg) (err error) { @@ -43,8 +43,8 @@ func (rlcfg *ResourceLimiterConfig) loadFromJsonCfg(jsnCfg *ResourceLimiterServJ rlcfg.CDRStatConns[idx].loadFromJsonCfg(jsnHaCfg) } } - if jsnCfg.Cache_dump_interval != nil { - if rlcfg.CacheDumpInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Cache_dump_interval); err != nil { + if jsnCfg.Store_interval != nil { + if rlcfg.StoreInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Store_interval); err != nil { return err } } diff --git a/config/statscfg.go b/config/statscfg.go index 851d4776b..551178ebc 100644 --- a/config/statscfg.go +++ b/config/statscfg.go @@ -24,8 +24,8 @@ import ( ) type StatSCfg struct { - Enabled bool - CacheDumpInterval time.Duration // Dump regularly from cache into dataDB + Enabled bool + StoreInterval time.Duration // Dump regularly from cache into dataDB } func (st *StatSCfg) loadFromJsonCfg(jsnCfg *StatServJsonCfg) (err error) { @@ -35,8 +35,8 @@ func (st *StatSCfg) loadFromJsonCfg(jsnCfg *StatServJsonCfg) (err error) { if jsnCfg.Enabled != nil { st.Enabled = *jsnCfg.Enabled } - if jsnCfg.Cache_dump_interval != nil { - if st.CacheDumpInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Cache_dump_interval); err != nil { + if jsnCfg.Store_interval != nil { + if st.StoreInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Store_interval); err != nil { return err } } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 2cb49a20f..025476851 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -21,6 +21,7 @@ import ( "bytes" "encoding/gob" "encoding/json" + "fmt" "reflect" "github.com/cgrates/cgrates/utils" @@ -187,6 +188,19 @@ type LoadWriter interface { SetTPStats([]*utils.TPStats) error } +// NewMarshaler returns the marshaler type selected by mrshlerStr +func NewMarshaler(mrshlerStr string) (ms Marshaler, err error) { + switch mrshlerStr { + case utils.MSGPACK: + ms = NewCodecMsgpackMarshaler() + case utils.JSON: + ms = new(JSONMarshaler) + default: + err = fmt.Errorf("Unsupported marshaler: %v", mrshlerStr) + } + return +} + type Marshaler interface { Marshal(v interface{}) ([]byte, error) Unmarshal(data []byte, v interface{}) error diff --git a/stats/service.go b/stats/service.go index e48e7509d..9fc03937c 100644 --- a/stats/service.go +++ b/stats/service.go @@ -72,13 +72,16 @@ type StatService struct { stInsts StatsInstances // ordered list of StatsQueues } -// Called to start the service -func (ss *StatService) ListenAndServe() error { +// ListenAndServe loops keeps the service alive +func (ss *StatService) ListenAndServe(exitChan chan bool) error { + e := <-exitChan + exitChan <- e // put back for the others listening for shutdown request return nil } // Called to shutdown the service -func (ss *StatService) ServiceShutdown() error { +// ToDo: improve with context, ie, following http implementation +func (ss *StatService) Shutdown() error { close(ss.stopStoring) ss.storeMetrics() return nil