This commit is contained in:
TeoV
2017-09-21 11:05:28 +03:00
9 changed files with 143 additions and 133 deletions

View File

@@ -18,25 +18,61 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
/*
import (
"reflect"
"strings"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/stats"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// GetStatQueueProfile returns a StatQueue profile
func (apierV1 *ApierV1) GetStatQueueProfile(arg *utils.TenantID, reply *engine.StatQueueProfile) (err error) {
if missing := utils.MissingStructFields(arg, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
if sCfg, err := apierV1.DataDB.GetStatQueueProfile(arg.Tenant, arg.ID,
false, utils.NonTransactional); err != nil {
return utils.APIErrorHandler(err)
} else {
*reply = *sCfg
}
return
}
// SetStatQueueProfile alters/creates a StatQueueProfile
func (apierV1 *ApierV1) SetStatQueueProfile(sqp *engine.StatQueueProfile, reply *string) error {
if missing := utils.MissingStructFields(sqp, []string{"Tenant", "ID"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
if err := apierV1.DataDB.SetStatQueueProfile(sqp); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK
return nil
}
// Remove a specific stat configuration
func (apierV1 *ApierV1) RemStatQueueProfile(args *utils.TenantID, reply *string) error {
if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
if err := apierV1.DataDB.RemStatQueueProfile(args.Tenant, args.ID, utils.NonTransactional); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK
return nil
}
// NewStatSV1 initializes StatSV1
func NewStatSV1(sts *stats.StatService) *StatSV1 {
return &StatSV1{sts: sts}
func NewStatSV1(sS *engine.StatService) *StatSV1 {
return &StatSV1{sS: sS}
}
// Exports RPC from RLs
type StatSV1 struct {
sts *stats.StatService
sS *engine.StatService
}
// Call implements rpcclient.RpcClientConnection interface for internal RPC
@@ -65,77 +101,21 @@ func (stsv1 *StatSV1) Call(serviceMethod string, args interface{}, reply interfa
}
// ProcessEvent returns processes a new Event
func (stsv1 *StatSV1) ProcessEvent(ev engine.StatsEvent, reply *string) error {
return stsv1.sts.V1ProcessEvent(ev, reply)
func (stsv1 *StatSV1) ProcessEvent(ev *engine.StatEvent, reply *string) error {
return stsv1.sS.V1ProcessEvent(ev, reply)
}
// GetQueueIDs returns the list of queues IDs in the system
func (stsv1 *StatSV1) GetQueueIDs(ignored struct{}, reply *[]string) (err error) {
return stsv1.sts.V1GetQueueIDs(ignored, reply)
func (stsv1 *StatSV1) GetStatQueuesForEvent(ev *engine.StatEvent, reply *engine.StatQueues) (err error) {
return stsv1.sS.V1GetStatQueuesForEvent(ev, reply)
}
// GetStatMetrics returns the metrics for a queueID
func (stsv1 *StatSV1) GetStringMetrics(queueID string, reply *map[string]string) (err error) {
return stsv1.sts.V1GetStringMetrics(queueID, reply)
// GetStringMetrics returns the string metrics for a Queue
func (stsv1 *StatSV1) GetQueueStringMetrics(args *utils.TenantID, reply *map[string]string) (err error) {
return stsv1.sS.V1GetQueueStringMetrics(args, reply)
}
// GetStatMetrics returns the metrics for a queueID
func (stsv1 *StatSV1) GetFloatMetrics(queueID string, reply *map[string]float64) (err error) {
return stsv1.sts.V1GetFloatMetrics(queueID, reply)
// GetQueueFloatMetrics returns the float metrics for a Queue
func (stsv1 *StatSV1) GetQueueFloatMetrics(args *utils.TenantID, reply *map[string]float64) (err error) {
return stsv1.sS.V1GetQueueFloatMetrics(args, reply)
}
// LoadQueues loads from dataDB into statsService the queueIDs specified
// loads all when qIDs is nil
func (stsv1 *StatSV1) LoadQueues(args stats.ArgsLoadQueues, reply *string) (err error) {
return stsv1.sts.V1LoadQueues(args, reply)
}
type AttrGetStatsCfg struct {
ID string
}
//GetStatConfig returns a stat configuration
func (apierV1 *ApierV1) GetStatQueueProfile(attr AttrGetStatsCfg, reply *engine.StatQueueProfile) error {
if missing := utils.MissingStructFields(&attr, []string{"ID"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
if sCfg, err := apierV1.DataDB.GetStatQueueProfile(attr.ID); err != nil {
if err.Error() != utils.ErrNotFound.Error() {
err = utils.NewErrServerError(err)
}
return err
} else {
*reply = *sCfg
}
return nil
}
//SetStatConfig add a new stat configuration
func (apierV1 *ApierV1) SetStatQueueProfile(attr *engine.StatQueueProfile, reply *string) error {
if missing := utils.MissingStructFields(attr, []string{"ID"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
if err := apierV1.DataDB.SetStatQueueProfile(attr); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK
return nil
}
//Remove a specific stat configuration
func (apierV1 *ApierV1) RemStatQueueProfile(attrs AttrGetStatsCfg, reply *string) error {
if missing := utils.MissingStructFields(&attrs, []string{"ID"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
if err := apierV1.DataDB.RemStatQueueProfile(attrs.ID); err != nil {
if err.Error() != utils.ErrNotFound.Error() {
err = utils.NewErrServerError(err)
}
return err
}
*reply = utils.OK
return nil
}
*/

View File

@@ -560,30 +560,28 @@ func startResourceService(internalRsChan, internalStatSConn chan rpcclient.RpcCl
internalRsChan <- rsV1
}
/*
// startStatService fires up the StatS
func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
dataDB engine.DataDB, ms engine.Marshaler, server *utils.Server, exitChan chan bool) {
sts, err := stats.NewStatService(dataDB, ms, cfg.StatSCfg().StoreInterval)
dm *engine.DataManager, server *utils.Server, exitChan chan bool) {
sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not init, error: %s", err.Error()))
exitChan <- true
return
}
utils.Logger.Info(fmt.Sprintf("Starting Stat service"))
utils.Logger.Info(fmt.Sprintf("Starting Stat Service"))
go func() {
if err := sts.ListenAndServe(exitChan); err != nil {
if err := sS.ListenAndServe(exitChan); err != nil {
utils.Logger.Crit(fmt.Sprintf("<StatS> Error: %s listening for packets", err.Error()))
}
sts.Shutdown()
sS.Shutdown()
exitChan <- true
return
}()
stsV1 := v1.NewStatSV1(sts)
stsV1 := v1.NewStatSV1(sS)
server.RpcRegister(stsV1)
internalStatSChan <- stsV1
}
*/
func startRpc(server *utils.Server, internalRaterChan,
internalCdrSChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan,
@@ -703,6 +701,7 @@ func main() {
var dataDB engine.DataDB
var loadDb engine.LoadStorage
var cdrDb engine.CdrStorage
var dm *engine.DataManager
if cfg.RALsEnabled || cfg.CDRStatsEnabled || cfg.PubSubServerEnabled || cfg.AliasesServerEnabled || cfg.UserServerEnabled || cfg.SchedulerEnabled {
dataDB, err = engine.ConfigureDataStorage(cfg.DataDbType, cfg.DataDbHost, cfg.DataDbPort,
@@ -736,6 +735,8 @@ func main() {
}
}
dm = engine.NewDataManager(dataDB)
// Done initing DBs
engine.SetRoundingDecimals(cfg.RoundingDecimals)
engine.SetRpSubjectPrefixMatching(cfg.RpSubjectPrefixMatching)
engine.SetLcrSubjectPrefixMatching(cfg.LcrSubjectPrefixMatching)
@@ -854,9 +855,9 @@ func main() {
internalStatSChan, cfg, dataDB, server, exitChan)
}
//if cfg.StatSCfg().Enabled {
// go startStatService(internalStatSChan, cfg, dataDB, ms, server, exitChan)
//}
if cfg.StatSCfg().Enabled {
go startStatService(internalStatSChan, cfg, dm, server, exitChan)
}
// Serve rpc connections
go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan,

View File

@@ -120,8 +120,8 @@
"stats": {
"enabled": false,
"store_interval": "0s",
"enabled": true,
"store_interval": "1s",
},

View File

@@ -113,8 +113,8 @@
"stats": {
"enabled": false,
"store_interval": "0s",
"enabled": true,
"store_interval": "1s",
},

View File

@@ -76,6 +76,12 @@
},
"stats": {
"enabled": true,
"store_interval": "1s",
},
"sm_generic": {
"enabled": true,
},

View File

@@ -1,2 +1,2 @@
#Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],QueueLength[6],TTL[7],Metrics[8],Blocker[9],Stored[10],Weight[11],Thresholds[12]
Tester,Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd;*acc,true,true,20,THRESH1;THRESH2
cgrates.org,STATS_1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd;*acc,true,true,20,THRESH1;THRESH2
1 #Tenant[0] Id[1] FilterType[2] FilterFieldName[3] FilterFieldValues[4] ActivationInterval[5] QueueLength[6] TTL[7] Metrics[8] Blocker[9] Stored[10] Weight[11] Thresholds[12]
2 Tester cgrates.org Stats1 STATS_1 *string Account 1001;1002 2014-07-29T15:00:00Z 100 1s *asr;*acd;*acc true true 20 THRESH1;THRESH2

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"errors"
"fmt"
"math/rand"
"reflect"
@@ -128,6 +129,9 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) {
// matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call
func (sS *StatService) matchingStatQueuesForEvent(ev *StatEvent) (sqs StatQueues, err error) {
if ev.Tenant == "" {
return nil, errors.New("missing Tenant information")
}
matchingSQs := make(map[string]*StatQueue)
sqIDs, err := matchingItemIDsForEvent(ev.Fields, sS.dm.DataDB(), utils.StatQueuesStringIndex+ev.Tenant)
if err != nil {
@@ -191,7 +195,7 @@ func (sS *StatService) matchingStatQueuesForEvent(ev *StatEvent) (sqs StatQueues
}
// Call implements rpcclient.RpcClientConnection interface for internal RPC
// here for testing purposes
// here for cases when passing StatsService as rpccclient.RpcClientConnection (ie. in ResourceS)
func (ss *StatService) Call(serviceMethod string, args interface{}, reply interface{}) error {
methodSplit := strings.Split(serviceMethod, ".")
if len(methodSplit) != 2 {
@@ -216,55 +220,68 @@ func (ss *StatService) Call(serviceMethod string, args interface{}, reply interf
return err
}
/*
// processEvent processes a StatsEvent through the queues and caches it when needed
func (ss *StatService) processEvent(ev StatsEvent) (err error) {
evStatsID := ev.ID()
if evStatsID == "" { // ID is mandatory
return errors.New("missing ID field")
// processEvent processes a new event, dispatching to matching queues
// queues matching are also cached to speed up
func (sS *StatService) processEvent(ev *StatEvent) (err error) {
if missing := utils.MissingStructFields(ev, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
for _, stInst := range sS.queues {
if err := stInst.ProcessEvent(ev); err != nil {
matchSQs, err := sS.matchingStatQueuesForEvent(ev)
if err != nil {
return err
}
if len(matchSQs) == 0 {
return utils.ErrNotFound
}
var withErrors bool
for _, sq := range matchSQs {
if err = sq.ProcessEvent(ev); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<StatService> QueueID: %s, ignoring event with ID: %s, error: %s",
stInst.cfg.ID, evStatsID, err.Error()))
fmt.Sprintf("<StatService> Queue: %s, ignoring event: %s, error: %s",
sq.TenantID(), ev.TenantID(), err.Error()))
withErrors = true
}
if stInst.cfg.Blocker {
if sq.sqPrfl.Blocker {
break
}
}
if withErrors {
err = utils.ErrPartiallyExecuted
}
return
}
// V1ProcessEvent implements StatV1 method for processing an Event
func (ss *StatService) V1ProcessEvent(ev StatsEvent, reply *string) (err error) {
func (sS *StatService) V1ProcessEvent(ev *StatEvent, reply *string) (err error) {
if err = sS.processEvent(ev); err == nil {
*reply = utils.OK
}
return
}
// V1GetQueueIDs returns list of queue IDs configured in the service
func (ss *StatService) V1GetQueueIDs(ignored struct{}, reply *[]string) (err error) {
if len(sS.queuesCache) == 0 {
return utils.ErrNotFound
}
for k := range sS.queuesCache {
*reply = append(*reply, k)
// V1StatQueuesForEvent implements StatV1 method for processing an Event
func (sS *StatService) V1GetStatQueuesForEvent(ev *StatEvent, reply *StatQueues) (err error) {
var sQs StatQueues
if sQs, err = sS.matchingStatQueuesForEvent(ev); err == nil {
*reply = sQs
}
return
}
// V1GetStringMetrics returns the metrics as string values
func (ss *StatService) V1GetStringMetrics(queueID string, reply *map[string]string) (err error) {
sq, has := sS.queuesCache[queueID]
if !has {
return utils.ErrNotFound
// V1GetQueueStringMetrics returns the metrics of a Queue as string values
func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[string]string) (err error) {
if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
metrics := make(map[string]string, len(sq.sqMetrics))
for metricID, metric := range sq.sqMetrics {
sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, false, "")
if err != nil {
if err != utils.ErrNotFound {
err = utils.NewErrServerError(err)
}
return err
}
metrics := make(map[string]string, len(sq.SQMetrics))
for metricID, metric := range sq.SQMetrics {
metrics[metricID] = metric.GetStringValue("")
}
*reply = metrics
@@ -272,16 +289,21 @@ func (ss *StatService) V1GetStringMetrics(queueID string, reply *map[string]stri
}
// V1GetFloatMetrics returns the metrics as float64 values
func (ss *StatService) V1GetFloatMetrics(queueID string, reply *map[string]float64) (err error) {
sq, has := sS.queuesCache[queueID]
if !has {
return utils.ErrNotFound
func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[string]float64) (err error) {
if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
metrics := make(map[string]float64, len(sq.sqMetrics))
for metricID, metric := range sq.sqMetrics {
sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, false, "")
if err != nil {
if err != utils.ErrNotFound {
err = utils.NewErrServerError(err)
}
return err
}
metrics := make(map[string]float64, len(sq.SQMetrics))
for metricID, metric := range sq.SQMetrics {
metrics[metricID] = metric.GetFloat64Value()
}
*reply = metrics
return
}
*/

View File

@@ -2086,7 +2086,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
}
}
if verbose {
log.Printf("Indexed ResourceProfile tenant: %s keys: %+v", tenant, rlIdxr.ChangedKeys().Slice())
log.Printf("Indexed ResourceProfile tenant: %s, keys: %+v", tenant, rlIdxr.ChangedKeys().Slice())
}
if err := rlIdxr.StoreIndexes(); err != nil {
return err
@@ -2110,7 +2110,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
}
}
if verbose {
log.Printf("Indexed Stats tenant: %s, keys %+v", stIdxr.ChangedKeys().Slice())
log.Printf("Indexed Stats tenant: %s, keys %+v", tenant, stIdxr.ChangedKeys().Slice())
}
if err := stIdxr.StoreIndexes(); err != nil {
return err
@@ -2133,7 +2133,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
}
}
if verbose {
log.Printf("Indexed Stats keys: %+v", stIdxr.ChangedKeys().Slice())
log.Printf("Indexed Threshold keys: %+v", stIdxr.ChangedKeys().Slice())
}
if err := stIdxr.StoreIndexes(); err != nil {
return err

View File

@@ -24,7 +24,7 @@ import (
)
var (
ErrNoMoreData = errors.New("no more data")
ErrNoMoreData = errors.New("NO_MORE_DATA")
ErrNotImplemented = errors.New("NOT_IMPLEMENTED")
ErrNotFound = errors.New("NOT_FOUND")
ErrTimedOut = errors.New("TIMED_OUT")
@@ -45,6 +45,7 @@ var (
ErrNotConvertible = errors.New("NOT_CONVERTIBLE")
ErrResourceUnavailable = errors.New("RESOURCE_UNAVAILABLE")
ErrNoActiveSession = errors.New("NO_ACTIVE_SESSION")
ErrPartiallyExecuted = errors.New("PARTIALLY_EXECUTED")
)
// NewCGRError initialises a new CGRError
@@ -92,14 +93,14 @@ func NewErrServerError(err error) error {
}
// Centralized returns for APIs
func APIErrorHandler(err error) error {
cgrErr, ok := err.(*CGRError)
func APIErrorHandler(errIn error) (err error) {
cgrErr, ok := errIn.(*CGRError)
if !ok {
if err == ErrNotFound {
return err
} else {
return NewErrServerError(err)
err = errIn
if err != ErrNotFound {
err = NewErrServerError(err)
}
return
}
cgrErr.ActivateAPIError()
return cgrErr