This commit is contained in:
Edwardro22
2017-08-03 01:47:21 +03:00
19 changed files with 309 additions and 46 deletions

View File

@@ -57,6 +57,7 @@ information, please see the [`CONTRIBUTING.md`](CONTRIBUTING.md) file.
| @alin104n | Alin Ioanovici |
| @wasimbaig | Wasim Baig |
| @MrGab | Gabriele Proni |
| @TeoV | Teofil Voivozeanu |
<!-- to sign, include a single line above this comment containing the following text:
| @username | First Last |

68
apier/v1/stats.go Normal file
View File

@@ -0,0 +1,68 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package v1
import (
"reflect"
"strings"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/stats"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewStatSV1 initializes StatSV1
func NewStatSV1(sts *stats.StatService) *StatSV1 {
return &StatSV1{sts: sts}
}
// Exports RPC from RLs
type StatSV1 struct {
sts *stats.StatService
}
// Call implements rpcclient.RpcClientConnection interface for internal RPC
func (stsv1 *StatSV1) Call(serviceMethod string, args interface{}, reply interface{}) error {
methodSplit := strings.Split(serviceMethod, ".")
if len(methodSplit) != 2 {
return rpcclient.ErrUnsupporteServiceMethod
}
method := reflect.ValueOf(stsv1).MethodByName(methodSplit[1])
if !method.IsValid() {
return rpcclient.ErrUnsupporteServiceMethod
}
params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
ret := method.Call(params)
if len(ret) != 1 {
return utils.ErrServerError
}
if ret[0].Interface() == nil {
return nil
}
err, ok := ret[0].Interface().(error)
if !ok {
return utils.ErrServerError
}
return err
}
// GetLimitsForEvent returns ResourceLimits matching a specific event
func (stsv1 *StatSV1) ProcessEvent(ev engine.StatsEvent, reply *string) error {
return stsv1.sts.V1ProcessEvent(ev, reply)
}

View File

@@ -52,7 +52,7 @@ var sTestsCDRsIT = []func(t *testing.T){
testV2CDRsRateWithoutTP,
testV2CDRsLoadTariffPlanFromFolder,
testV2CDRsRateWithTP,
engine.KillEngineTest,
// ToDo: test engine shutdown
}
// Tests starting here

View File

@@ -53,7 +53,7 @@ var sTestsTutIT = []func(t *testing.T){
testTPitRpcConn,
testTPitTimings,
testTPitDestinations,
engine.KillEngineTest,
// ToDo: test engine shutdown
}
// Tests starting here

View File

@@ -39,6 +39,7 @@ import (
"github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/sessionmanager"
"github.com/cgrates/cgrates/stats"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
@@ -545,9 +546,32 @@ func startResourceLimiterService(internalRLSChan, internalCdrStatSChan chan rpcc
internalRLSChan <- rlsV1
}
// startStatService fires up the StatS
func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
dataDB engine.DataDB, ms engine.Marshaler, server *utils.Server, exitChan chan bool) {
sts, err := stats.NewStatService(dataDB, ms, cfg.StatSCfg().StoreInterval)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not init, error: %s", err.Error()))
exitChan <- true
return
}
utils.Logger.Info(fmt.Sprintf("Starting Stat service"))
go func() {
if err := sts.ListenAndServe(exitChan); err != nil {
utils.Logger.Crit(fmt.Sprintf("<StatS> Error: %s listening for packets", err.Error()))
}
sts.Shutdown()
exitChan <- true
return
}()
stsV1 := v1.NewStatSV1(sts)
server.RpcRegister(stsV1)
internalStatSChan <- stsV1
}
func startRpc(server *utils.Server, internalRaterChan,
internalCdrSChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan,
internalAliaseSChan, internalRLsChan chan rpcclient.RpcClientConnection, internalSMGChan chan *sessionmanager.SMGeneric) {
internalAliaseSChan, internalRLsChan, internalStatSChan chan rpcclient.RpcClientConnection, internalSMGChan chan *sessionmanager.SMGeneric) {
select { // Any of the rpc methods will unlock listening to rpc requests
case resp := <-internalRaterChan:
internalRaterChan <- resp
@@ -567,6 +591,8 @@ func startRpc(server *utils.Server, internalRaterChan,
internalSMGChan <- smg
case rls := <-internalRLsChan:
internalRLsChan <- rls
case statS := <-internalStatSChan:
internalStatSChan <- statS
}
go server.ServeJSON(cfg.RPCJSONListen)
go server.ServeGOB(cfg.RPCGOBListen)
@@ -656,6 +682,17 @@ func main() {
// Init cache
cache.NewCache(cfg.CacheConfig)
var ms engine.Marshaler
if ms, err = engine.NewMarshaler(cfg.DBDataEncoding); err != nil {
log.Fatalf("error initializing marshaler: ", err)
return
}
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not start, error: %s", err.Error()))
exitChan <- true
return
}
var dataDB engine.DataDB
var loadDb engine.LoadStorage
var cdrDb engine.CdrStorage
@@ -709,6 +746,7 @@ func main() {
internalAliaseSChan := make(chan rpcclient.RpcClientConnection, 1)
internalSMGChan := make(chan *sessionmanager.SMGeneric, 1)
internalRLSChan := make(chan rpcclient.RpcClientConnection, 1)
internalStatSChan := make(chan rpcclient.RpcClientConnection, 1)
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, dataDB, exitChan, cacheDoneChan)
@@ -802,9 +840,13 @@ func main() {
go startResourceLimiterService(internalRLSChan, internalCdrStatSChan, cfg, dataDB, server, exitChan)
}
if cfg.StatSCfg().Enabled {
go startStatService(internalStatSChan, cfg, dataDB, ms, server, exitChan)
}
// Serve rpc connections
go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan,
internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRLSChan, internalSMGChan)
internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRLSChan, internalStatSChan, internalSMGChan)
<-exitChan
if *pidFile != "" {

View File

@@ -264,6 +264,7 @@ type CGRConfig struct {
UserServerEnabled bool // Starts User as server: <true|false>
UserServerIndexes []string // List of user profile field indexes
resourceLimiterCfg *ResourceLimiterConfig // Configuration for resource limiter
statsCfg *StatSCfg // Configuration for StatS
MailerServer string // The server to use when sending emails out
MailerAuthUser string // Authenticate to email server using this user
MailerAuthPass string // Authenticate to email server with this password
@@ -624,6 +625,11 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
return err
}
jsnStatSCfg, err := jsnCfg.StatSJsonCfg()
if err != nil {
return err
}
jsnMailerCfg, err := jsnCfg.MailerJsonCfg()
if err != nil {
return err
@@ -1051,6 +1057,15 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
}
}
if jsnStatSCfg != nil {
if self.statsCfg == nil {
self.statsCfg = new(StatSCfg)
}
if self.statsCfg.loadFromJsonCfg(jsnStatSCfg); err != nil {
return err
}
}
if jsnUserServCfg != nil {
if jsnUserServCfg.Enabled != nil {
self.UserServerEnabled = *jsnUserServCfg.Enabled
@@ -1109,6 +1124,11 @@ func (self *CGRConfig) ResourceLimiterCfg() *ResourceLimiterConfig {
return self.resourceLimiterCfg
}
// ToDo: fix locking
func (cfg *CGRConfig) StatSCfg() *StatSCfg {
return cfg.statsCfg
}
// ToDo: fix locking here
func (self *CGRConfig) SMAsteriskCfg() *SMAsteriskCfg {
cfgChan := <-self.ConfigReloads[utils.SMAsterisk] // Lock config for read or reloads

View File

@@ -405,7 +405,13 @@ const CGRATES_CFG_JSON = `
"rls": {
"enabled": false, // starts ResourceLimiter service: <true|false>.
"cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
"cache_dump_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur>
"store_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur>
},
"stats": {
"enabled": false, // starts Stat service: <true|false>.
"store_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur>
},

View File

@@ -23,6 +23,7 @@ import (
"os"
"github.com/DisposaBoy/JsonConfigReader"
"github.com/cgrates/cgrates/utils"
)
const (
@@ -358,6 +359,18 @@ func (self CgrJsonCfg) ResourceLimiterJsonCfg() (*ResourceLimiterServJsonCfg, er
return cfg, nil
}
func (self CgrJsonCfg) StatSJsonCfg() (*StatServJsonCfg, error) {
rawCfg, hasKey := self[utils.StatS]
if !hasKey {
return nil, nil
}
cfg := new(StatServJsonCfg)
if err := json.Unmarshal(*rawCfg, cfg); err != nil {
return nil, err
}
return cfg, nil
}
func (self CgrJsonCfg) MailerJsonCfg() (*MailerJsonCfg, error) {
rawCfg, hasKey := self[MAILER_JSN]
if !hasKey {

View File

@@ -665,9 +665,9 @@ func TestDfUserServJsonCfg(t *testing.T) {
func TestDfResourceLimiterSJsonCfg(t *testing.T) {
eCfg := &ResourceLimiterServJsonCfg{
Enabled: utils.BoolPointer(false),
Cdrstats_conns: &[]*HaPoolJsonCfg{},
Cache_dump_interval: utils.StringPointer("0s"),
Enabled: utils.BoolPointer(false),
Cdrstats_conns: &[]*HaPoolJsonCfg{},
Store_interval: utils.StringPointer("0s"),
}
if cfg, err := dfCgrJsonCfg.ResourceLimiterJsonCfg(); err != nil {
t.Error(err)
@@ -676,6 +676,18 @@ func TestDfResourceLimiterSJsonCfg(t *testing.T) {
}
}
func TestDfStatServiceJsonCfg(t *testing.T) {
eCfg := &StatServJsonCfg{
Enabled: utils.BoolPointer(false),
Store_interval: utils.StringPointer("0s"),
}
if cfg, err := dfCgrJsonCfg.StatSJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, cfg) {
t.Error("Received: ", cfg)
}
}
func TestDfMailerJsonCfg(t *testing.T) {
eCfg := &MailerJsonCfg{
Server: utils.StringPointer("localhost"),

View File

@@ -553,9 +553,9 @@ func TestCgrCfgJSONDefaultsUserS(t *testing.T) {
func TestCgrCfgJSONDefaultsResLimCfg(t *testing.T) {
eResLiCfg := &ResourceLimiterConfig{
Enabled: false,
CDRStatConns: []*HaPoolConfig{},
CacheDumpInterval: 0 * time.Second,
Enabled: false,
CDRStatConns: []*HaPoolConfig{},
StoreInterval: 0,
}
if !reflect.DeepEqual(cgrCfg.resourceLimiterCfg, eResLiCfg) {

View File

@@ -378,9 +378,15 @@ type UserServJsonCfg struct {
// ResourceLimiter service config section
type ResourceLimiterServJsonCfg struct {
Enabled *bool
Cdrstats_conns *[]*HaPoolJsonCfg
Cache_dump_interval *string
Enabled *bool
Cdrstats_conns *[]*HaPoolJsonCfg
Store_interval *string
}
// Stat service config section
type StatServJsonCfg struct {
Enabled *bool
Store_interval *string
}
// Mailer config section

View File

@@ -24,9 +24,9 @@ import (
)
type ResourceLimiterConfig struct {
Enabled bool
CDRStatConns []*HaPoolConfig // Connections towards CDRStatS
CacheDumpInterval time.Duration // Dump regularly from cache into dataDB
Enabled bool
CDRStatConns []*HaPoolConfig // Connections towards CDRStatS
StoreInterval time.Duration // Dump regularly from cache into dataDB
}
func (rlcfg *ResourceLimiterConfig) loadFromJsonCfg(jsnCfg *ResourceLimiterServJsonCfg) (err error) {
@@ -43,8 +43,8 @@ func (rlcfg *ResourceLimiterConfig) loadFromJsonCfg(jsnCfg *ResourceLimiterServJ
rlcfg.CDRStatConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Cache_dump_interval != nil {
if rlcfg.CacheDumpInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Cache_dump_interval); err != nil {
if jsnCfg.Store_interval != nil {
if rlcfg.StoreInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Store_interval); err != nil {
return err
}
}

44
config/statscfg.go Normal file
View File

@@ -0,0 +1,44 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package config
import (
"time"
"github.com/cgrates/cgrates/utils"
)
type StatSCfg struct {
Enabled bool
StoreInterval time.Duration // Dump regularly from cache into dataDB
}
func (st *StatSCfg) loadFromJsonCfg(jsnCfg *StatServJsonCfg) (err error) {
if jsnCfg == nil {
return nil
}
if jsnCfg.Enabled != nil {
st.Enabled = *jsnCfg.Enabled
}
if jsnCfg.Store_interval != nil {
if st.StoreInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Store_interval); err != nil {
return err
}
}
return nil
}

View File

@@ -0,0 +1,29 @@
{
"general": {
"log_level": 7,
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"stor_db": {
"db_password": "CGRateS.org",
},
"rals": {
"enabled": true,
},
"stats": {
"enabled": true,
"store_interval": "0s",
},
}

View File

@@ -25,7 +25,6 @@ import (
"os"
"os/exec"
"path"
"testing"
"time"
"github.com/cgrates/cgrates/config"
@@ -95,13 +94,6 @@ func KillEngine(waitEngine int) error {
return nil
}
// KillEngineTest is included in tests to shutdown the CGRateS processes
func KillEngineTest(t *testing.T) {
if err := KillEngine(100); err != nil {
t.Error(err)
}
}
func StopStartEngine(cfgPath string, waitEngine int) (*exec.Cmd, error) {
KillEngine(waitEngine)
return StartEngine(cfgPath, waitEngine)

View File

@@ -21,6 +21,7 @@ import (
"bytes"
"encoding/gob"
"encoding/json"
"fmt"
"reflect"
"github.com/cgrates/cgrates/utils"
@@ -192,6 +193,19 @@ type LoadWriter interface {
SetTPThresholdCfg([]*utils.TPThresholdCfg) error
}
// NewMarshaler returns the marshaler type selected by mrshlerStr
func NewMarshaler(mrshlerStr string) (ms Marshaler, err error) {
switch mrshlerStr {
case utils.MSGPACK:
ms = NewCodecMsgpackMarshaler()
case utils.JSON:
ms = new(JSONMarshaler)
default:
err = fmt.Errorf("Unsupported marshaler: %v", mrshlerStr)
}
return
}
type Marshaler interface {
Marshal(v interface{}) ([]byte, error)
Unmarshal(data []byte, v interface{}) error

View File

@@ -443,8 +443,10 @@ func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) erro
}
}
for i, s := range ss[initialID] {
s.mux.Lock()
s.CGRID = cgrID // Overwrite initial CGRID with new one
s.EventStart[utils.ACCID] = newOriginID // Overwrite OriginID for session indexing
s.mux.Unlock()
smg.recordASession(s)
if i == 0 {
smg.unrecordASession(initialID)

View File

@@ -72,34 +72,23 @@ type StatService struct {
stInsts StatsInstances // ordered list of StatsQueues
}
// Called to start the service
func (ss *StatService) ListenAndServe() error {
// ListenAndServe loops keeps the service alive
func (ss *StatService) ListenAndServe(exitChan chan bool) error {
e := <-exitChan
exitChan <- e // put back for the others listening for shutdown request
return nil
}
// Called to shutdown the service
func (ss *StatService) ServiceShutdown() error {
// ToDo: improve with context, ie, following http implementation
func (ss *StatService) Shutdown() error {
utils.Logger.Info("<StatS> service shutdown initialized")
close(ss.stopStoring)
ss.storeMetrics()
utils.Logger.Info("<StatS> service shutdown complete")
return nil
}
// processEvent processes a StatsEvent through the queues and caches it when needed
func (ss *StatService) processEvent(ev engine.StatsEvent) (err error) {
evStatsID := ev.ID()
if evStatsID == "" { // ID is mandatory
return errors.New("missing ID field")
}
for _, stInst := range ss.stInsts {
if err := stInst.ProcessEvent(ev); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<StatService> QueueID: %s, ignoring event with ID: %s, error: %s",
stInst.cfg.ID, evStatsID, err.Error()))
}
}
return
}
// store stores the necessary storedMetrics to dataDB
func (ss *StatService) storeMetrics() {
for _, si := range ss.stInsts {
@@ -130,3 +119,27 @@ func (ss *StatService) dumpStoredMetrics() {
time.Sleep(ss.storeInterval)
}
}
// processEvent processes a StatsEvent through the queues and caches it when needed
func (ss *StatService) processEvent(ev engine.StatsEvent) (err error) {
evStatsID := ev.ID()
if evStatsID == "" { // ID is mandatory
return errors.New("missing ID field")
}
for _, stInst := range ss.stInsts {
if err := stInst.ProcessEvent(ev); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<StatService> QueueID: %s, ignoring event with ID: %s, error: %s",
stInst.cfg.ID, evStatsID, err.Error()))
}
}
return
}
// V1ProcessEvent implements StatV1 method for processing an Event
func (ss *StatService) V1ProcessEvent(ev engine.StatsEvent, reply *string) (err error) {
if err = ss.processEvent(ev); err == nil {
*reply = utils.OK
}
return
}

View File

@@ -422,6 +422,7 @@ const (
CacheDerivedChargers = "derived_chargers"
CacheResourceLimits = "resource_limits"
CacheTimings = "timings"
StatS = "stats"
)
func buildCacheInstRevPrefixes() {