diff --git a/apier/v1/stats.go b/apier/v1/stats.go index ccea7b106..3c46e457f 100644 --- a/apier/v1/stats.go +++ b/apier/v1/stats.go @@ -18,25 +18,61 @@ along with this program. If not, see package v1 -/* import ( "reflect" "strings" "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/stats" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) +// GetStatQueueProfile returns a StatQueue profile +func (apierV1 *ApierV1) GetStatQueueProfile(arg *utils.TenantID, reply *engine.StatQueueProfile) (err error) { + if missing := utils.MissingStructFields(arg, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } + if sCfg, err := apierV1.DataDB.GetStatQueueProfile(arg.Tenant, arg.ID, + false, utils.NonTransactional); err != nil { + return utils.APIErrorHandler(err) + } else { + *reply = *sCfg + } + return +} + +// SetStatQueueProfile alters/creates a StatQueueProfile +func (apierV1 *ApierV1) SetStatQueueProfile(sqp *engine.StatQueueProfile, reply *string) error { + if missing := utils.MissingStructFields(sqp, []string{"Tenant", "ID"}); len(missing) != 0 { + return utils.NewErrMandatoryIeMissing(missing...) + } + if err := apierV1.DataDB.SetStatQueueProfile(sqp); err != nil { + return utils.APIErrorHandler(err) + } + *reply = utils.OK + return nil +} + +// Remove a specific stat configuration +func (apierV1 *ApierV1) RemStatQueueProfile(args *utils.TenantID, reply *string) error { + if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } + if err := apierV1.DataDB.RemStatQueueProfile(args.Tenant, args.ID, utils.NonTransactional); err != nil { + return utils.APIErrorHandler(err) + } + *reply = utils.OK + return nil +} + // NewStatSV1 initializes StatSV1 -func NewStatSV1(sts *stats.StatService) *StatSV1 { - return &StatSV1{sts: sts} +func NewStatSV1(sS *engine.StatService) *StatSV1 { + return &StatSV1{sS: sS} } // Exports RPC from RLs type StatSV1 struct { - sts *stats.StatService + sS *engine.StatService } // Call implements rpcclient.RpcClientConnection interface for internal RPC @@ -65,77 +101,21 @@ func (stsv1 *StatSV1) Call(serviceMethod string, args interface{}, reply interfa } // ProcessEvent returns processes a new Event -func (stsv1 *StatSV1) ProcessEvent(ev engine.StatsEvent, reply *string) error { - return stsv1.sts.V1ProcessEvent(ev, reply) +func (stsv1 *StatSV1) ProcessEvent(ev *engine.StatEvent, reply *string) error { + return stsv1.sS.V1ProcessEvent(ev, reply) } // GetQueueIDs returns the list of queues IDs in the system -func (stsv1 *StatSV1) GetQueueIDs(ignored struct{}, reply *[]string) (err error) { - return stsv1.sts.V1GetQueueIDs(ignored, reply) +func (stsv1 *StatSV1) GetStatQueuesForEvent(ev *engine.StatEvent, reply *engine.StatQueues) (err error) { + return stsv1.sS.V1GetStatQueuesForEvent(ev, reply) } -// GetStatMetrics returns the metrics for a queueID -func (stsv1 *StatSV1) GetStringMetrics(queueID string, reply *map[string]string) (err error) { - return stsv1.sts.V1GetStringMetrics(queueID, reply) +// GetStringMetrics returns the string metrics for a Queue +func (stsv1 *StatSV1) GetQueueStringMetrics(args *utils.TenantID, reply *map[string]string) (err error) { + return stsv1.sS.V1GetQueueStringMetrics(args, reply) } -// GetStatMetrics returns the metrics for a queueID -func (stsv1 *StatSV1) GetFloatMetrics(queueID string, reply *map[string]float64) (err error) { - return stsv1.sts.V1GetFloatMetrics(queueID, reply) +// GetQueueFloatMetrics returns the float metrics for a Queue +func (stsv1 *StatSV1) GetQueueFloatMetrics(args *utils.TenantID, reply *map[string]float64) (err error) { + return stsv1.sS.V1GetQueueFloatMetrics(args, reply) } - -// LoadQueues loads from dataDB into statsService the queueIDs specified -// loads all when qIDs is nil -func (stsv1 *StatSV1) LoadQueues(args stats.ArgsLoadQueues, reply *string) (err error) { - return stsv1.sts.V1LoadQueues(args, reply) -} - -type AttrGetStatsCfg struct { - ID string -} - -//GetStatConfig returns a stat configuration -func (apierV1 *ApierV1) GetStatQueueProfile(attr AttrGetStatsCfg, reply *engine.StatQueueProfile) error { - if missing := utils.MissingStructFields(&attr, []string{"ID"}); len(missing) != 0 { //Params missing - return utils.NewErrMandatoryIeMissing(missing...) - } - if sCfg, err := apierV1.DataDB.GetStatQueueProfile(attr.ID); err != nil { - if err.Error() != utils.ErrNotFound.Error() { - err = utils.NewErrServerError(err) - } - return err - } else { - *reply = *sCfg - } - return nil -} - -//SetStatConfig add a new stat configuration -func (apierV1 *ApierV1) SetStatQueueProfile(attr *engine.StatQueueProfile, reply *string) error { - if missing := utils.MissingStructFields(attr, []string{"ID"}); len(missing) != 0 { - return utils.NewErrMandatoryIeMissing(missing...) - } - if err := apierV1.DataDB.SetStatQueueProfile(attr); err != nil { - return utils.APIErrorHandler(err) - } - *reply = utils.OK - return nil - -} - -//Remove a specific stat configuration -func (apierV1 *ApierV1) RemStatQueueProfile(attrs AttrGetStatsCfg, reply *string) error { - if missing := utils.MissingStructFields(&attrs, []string{"ID"}); len(missing) != 0 { //Params missing - return utils.NewErrMandatoryIeMissing(missing...) - } - if err := apierV1.DataDB.RemStatQueueProfile(attrs.ID); err != nil { - if err.Error() != utils.ErrNotFound.Error() { - err = utils.NewErrServerError(err) - } - return err - } - *reply = utils.OK - return nil - -} -*/ diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 0c2968f08..80b555f40 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -560,30 +560,28 @@ func startResourceService(internalRsChan, internalStatSConn chan rpcclient.RpcCl internalRsChan <- rsV1 } -/* // startStatService fires up the StatS func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, - dataDB engine.DataDB, ms engine.Marshaler, server *utils.Server, exitChan chan bool) { - sts, err := stats.NewStatService(dataDB, ms, cfg.StatSCfg().StoreInterval) + dm *engine.DataManager, server *utils.Server, exitChan chan bool) { + sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) exitChan <- true return } - utils.Logger.Info(fmt.Sprintf("Starting Stat service")) + utils.Logger.Info(fmt.Sprintf("Starting Stat Service")) go func() { - if err := sts.ListenAndServe(exitChan); err != nil { + if err := sS.ListenAndServe(exitChan); err != nil { utils.Logger.Crit(fmt.Sprintf(" Error: %s listening for packets", err.Error())) } - sts.Shutdown() + sS.Shutdown() exitChan <- true return }() - stsV1 := v1.NewStatSV1(sts) + stsV1 := v1.NewStatSV1(sS) server.RpcRegister(stsV1) internalStatSChan <- stsV1 } -*/ func startRpc(server *utils.Server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, @@ -703,6 +701,7 @@ func main() { var dataDB engine.DataDB var loadDb engine.LoadStorage var cdrDb engine.CdrStorage + var dm *engine.DataManager if cfg.RALsEnabled || cfg.CDRStatsEnabled || cfg.PubSubServerEnabled || cfg.AliasesServerEnabled || cfg.UserServerEnabled || cfg.SchedulerEnabled { dataDB, err = engine.ConfigureDataStorage(cfg.DataDbType, cfg.DataDbHost, cfg.DataDbPort, @@ -736,6 +735,8 @@ func main() { } } + dm = engine.NewDataManager(dataDB) + // Done initing DBs engine.SetRoundingDecimals(cfg.RoundingDecimals) engine.SetRpSubjectPrefixMatching(cfg.RpSubjectPrefixMatching) engine.SetLcrSubjectPrefixMatching(cfg.LcrSubjectPrefixMatching) @@ -854,9 +855,9 @@ func main() { internalStatSChan, cfg, dataDB, server, exitChan) } - //if cfg.StatSCfg().Enabled { - // go startStatService(internalStatSChan, cfg, dataDB, ms, server, exitChan) - //} + if cfg.StatSCfg().Enabled { + go startStatService(internalStatSChan, cfg, dm, server, exitChan) + } // Serve rpc connections go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan, diff --git a/data/conf/samples/tutmongo/cgrates.json b/data/conf/samples/tutmongo/cgrates.json index 18963eb3b..50800bd45 100644 --- a/data/conf/samples/tutmongo/cgrates.json +++ b/data/conf/samples/tutmongo/cgrates.json @@ -120,8 +120,8 @@ "stats": { - "enabled": false, - "store_interval": "0s", + "enabled": true, + "store_interval": "1s", }, diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index 0fcc01a1c..a2414e4e1 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -113,8 +113,8 @@ "stats": { - "enabled": false, - "store_interval": "0s", + "enabled": true, + "store_interval": "1s", }, diff --git a/data/conf/samples/tutpostgres/cgrates.json b/data/conf/samples/tutpostgres/cgrates.json index eb438b758..634b3af0b 100644 --- a/data/conf/samples/tutpostgres/cgrates.json +++ b/data/conf/samples/tutpostgres/cgrates.json @@ -76,6 +76,12 @@ }, +"stats": { + "enabled": true, + "store_interval": "1s", +}, + + "sm_generic": { "enabled": true, }, diff --git a/data/tariffplans/tutorial/Stats.csv b/data/tariffplans/tutorial/Stats.csv index 93311a0e6..6832735d3 100755 --- a/data/tariffplans/tutorial/Stats.csv +++ b/data/tariffplans/tutorial/Stats.csv @@ -1,2 +1,2 @@ #Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],QueueLength[6],TTL[7],Metrics[8],Blocker[9],Stored[10],Weight[11],Thresholds[12] -Tester,Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd;*acc,true,true,20,THRESH1;THRESH2 \ No newline at end of file +cgrates.org,STATS_1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd;*acc,true,true,20,THRESH1;THRESH2 \ No newline at end of file diff --git a/engine/stats.go b/engine/stats.go index 9275e6f93..1d878755a 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -19,6 +19,7 @@ along with this program. If not, see package engine import ( + "errors" "fmt" "math/rand" "reflect" @@ -128,6 +129,9 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) { // matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call func (sS *StatService) matchingStatQueuesForEvent(ev *StatEvent) (sqs StatQueues, err error) { + if ev.Tenant == "" { + return nil, errors.New("missing Tenant information") + } matchingSQs := make(map[string]*StatQueue) sqIDs, err := matchingItemIDsForEvent(ev.Fields, sS.dm.DataDB(), utils.StatQueuesStringIndex+ev.Tenant) if err != nil { @@ -191,7 +195,7 @@ func (sS *StatService) matchingStatQueuesForEvent(ev *StatEvent) (sqs StatQueues } // Call implements rpcclient.RpcClientConnection interface for internal RPC -// here for testing purposes +// here for cases when passing StatsService as rpccclient.RpcClientConnection (ie. in ResourceS) func (ss *StatService) Call(serviceMethod string, args interface{}, reply interface{}) error { methodSplit := strings.Split(serviceMethod, ".") if len(methodSplit) != 2 { @@ -216,55 +220,68 @@ func (ss *StatService) Call(serviceMethod string, args interface{}, reply interf return err } -/* - - -// processEvent processes a StatsEvent through the queues and caches it when needed -func (ss *StatService) processEvent(ev StatsEvent) (err error) { - evStatsID := ev.ID() - if evStatsID == "" { // ID is mandatory - return errors.New("missing ID field") +// processEvent processes a new event, dispatching to matching queues +// queues matching are also cached to speed up +func (sS *StatService) processEvent(ev *StatEvent) (err error) { + if missing := utils.MissingStructFields(ev, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) } - for _, stInst := range sS.queues { - if err := stInst.ProcessEvent(ev); err != nil { + matchSQs, err := sS.matchingStatQueuesForEvent(ev) + if err != nil { + return err + } + if len(matchSQs) == 0 { + return utils.ErrNotFound + } + var withErrors bool + for _, sq := range matchSQs { + if err = sq.ProcessEvent(ev); err != nil { utils.Logger.Warning( - fmt.Sprintf(" QueueID: %s, ignoring event with ID: %s, error: %s", - stInst.cfg.ID, evStatsID, err.Error())) + fmt.Sprintf(" Queue: %s, ignoring event: %s, error: %s", + sq.TenantID(), ev.TenantID(), err.Error())) + withErrors = true } - if stInst.cfg.Blocker { + if sq.sqPrfl.Blocker { break } } + if withErrors { + err = utils.ErrPartiallyExecuted + } return } // V1ProcessEvent implements StatV1 method for processing an Event -func (ss *StatService) V1ProcessEvent(ev StatsEvent, reply *string) (err error) { +func (sS *StatService) V1ProcessEvent(ev *StatEvent, reply *string) (err error) { if err = sS.processEvent(ev); err == nil { *reply = utils.OK } return } -// V1GetQueueIDs returns list of queue IDs configured in the service -func (ss *StatService) V1GetQueueIDs(ignored struct{}, reply *[]string) (err error) { - if len(sS.queuesCache) == 0 { - return utils.ErrNotFound - } - for k := range sS.queuesCache { - *reply = append(*reply, k) +// V1StatQueuesForEvent implements StatV1 method for processing an Event +func (sS *StatService) V1GetStatQueuesForEvent(ev *StatEvent, reply *StatQueues) (err error) { + var sQs StatQueues + if sQs, err = sS.matchingStatQueuesForEvent(ev); err == nil { + *reply = sQs } return } -// V1GetStringMetrics returns the metrics as string values -func (ss *StatService) V1GetStringMetrics(queueID string, reply *map[string]string) (err error) { - sq, has := sS.queuesCache[queueID] - if !has { - return utils.ErrNotFound +// V1GetQueueStringMetrics returns the metrics of a Queue as string values +func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[string]string) (err error) { + if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) } - metrics := make(map[string]string, len(sq.sqMetrics)) - for metricID, metric := range sq.sqMetrics { + sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, false, "") + if err != nil { + if err != utils.ErrNotFound { + err = utils.NewErrServerError(err) + } + return err + } + metrics := make(map[string]string, len(sq.SQMetrics)) + for metricID, metric := range sq.SQMetrics { metrics[metricID] = metric.GetStringValue("") } *reply = metrics @@ -272,16 +289,21 @@ func (ss *StatService) V1GetStringMetrics(queueID string, reply *map[string]stri } // V1GetFloatMetrics returns the metrics as float64 values -func (ss *StatService) V1GetFloatMetrics(queueID string, reply *map[string]float64) (err error) { - sq, has := sS.queuesCache[queueID] - if !has { - return utils.ErrNotFound +func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[string]float64) (err error) { + if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) } - metrics := make(map[string]float64, len(sq.sqMetrics)) - for metricID, metric := range sq.sqMetrics { + sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, false, "") + if err != nil { + if err != utils.ErrNotFound { + err = utils.NewErrServerError(err) + } + return err + } + metrics := make(map[string]float64, len(sq.SQMetrics)) + for metricID, metric := range sq.SQMetrics { metrics[metricID] = metric.GetFloat64Value() } *reply = metrics return } -*/ diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 20f104991..9c2073ce6 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -2086,7 +2086,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err } } if verbose { - log.Printf("Indexed ResourceProfile tenant: %s keys: %+v", tenant, rlIdxr.ChangedKeys().Slice()) + log.Printf("Indexed ResourceProfile tenant: %s, keys: %+v", tenant, rlIdxr.ChangedKeys().Slice()) } if err := rlIdxr.StoreIndexes(); err != nil { return err @@ -2110,7 +2110,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err } } if verbose { - log.Printf("Indexed Stats tenant: %s, keys %+v", stIdxr.ChangedKeys().Slice()) + log.Printf("Indexed Stats tenant: %s, keys %+v", tenant, stIdxr.ChangedKeys().Slice()) } if err := stIdxr.StoreIndexes(); err != nil { return err @@ -2133,7 +2133,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err } } if verbose { - log.Printf("Indexed Stats keys: %+v", stIdxr.ChangedKeys().Slice()) + log.Printf("Indexed Threshold keys: %+v", stIdxr.ChangedKeys().Slice()) } if err := stIdxr.StoreIndexes(); err != nil { return err diff --git a/utils/errors.go b/utils/errors.go index fe7708d67..54471678e 100644 --- a/utils/errors.go +++ b/utils/errors.go @@ -24,7 +24,7 @@ import ( ) var ( - ErrNoMoreData = errors.New("no more data") + ErrNoMoreData = errors.New("NO_MORE_DATA") ErrNotImplemented = errors.New("NOT_IMPLEMENTED") ErrNotFound = errors.New("NOT_FOUND") ErrTimedOut = errors.New("TIMED_OUT") @@ -45,6 +45,7 @@ var ( ErrNotConvertible = errors.New("NOT_CONVERTIBLE") ErrResourceUnavailable = errors.New("RESOURCE_UNAVAILABLE") ErrNoActiveSession = errors.New("NO_ACTIVE_SESSION") + ErrPartiallyExecuted = errors.New("PARTIALLY_EXECUTED") ) // NewCGRError initialises a new CGRError @@ -92,14 +93,14 @@ func NewErrServerError(err error) error { } // Centralized returns for APIs -func APIErrorHandler(err error) error { - cgrErr, ok := err.(*CGRError) +func APIErrorHandler(errIn error) (err error) { + cgrErr, ok := errIn.(*CGRError) if !ok { - if err == ErrNotFound { - return err - } else { - return NewErrServerError(err) + err = errIn + if err != ErrNotFound { + err = NewErrServerError(err) } + return } cgrErr.ActivateAPIError() return cgrErr