From ce454350595dab04cd3233358a171165c900c5e0 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 4 Aug 2017 09:52:36 +0200 Subject: [PATCH] Stats into RALs and RLS, GetFloat64Value added to stat metrics --- cmd/cgr-engine/cgr-engine.go | 3 +- cmd/cgr-engine/rater.go | 39 +++++++++----------- data/conf/samples/reslimiter/cgrates.json | 8 ++-- data/tariffplans/tutorial/ResourceLimits.csv | 4 +- stats/acd.go | 4 ++ stats/asr.go | 16 +++++--- stats/metric.go | 3 +- 7 files changed, 42 insertions(+), 35 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index ed0e1f471..8aa4ba552 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -763,7 +763,8 @@ func main() { // Start rater service if cfg.RALsEnabled { - go startRater(internalRaterChan, cacheDoneChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, + go startRater(internalRaterChan, cacheDoneChan, internalCdrStatSChan, internalStatSChan, + internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, srvManager, server, dataDB, loadDb, cdrDb, &stopHandled, exitChan) } diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index 6699adfdb..ad4879d15 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -29,21 +29,10 @@ import ( "github.com/cgrates/rpcclient" ) -/*func init() { - gob.Register(map[interface{}]struct{}{}) - gob.Register(engine.Actions{}) - gob.RegisterName("github.com/cgrates/cgrates/engine.ActionPlan", &engine.ActionPlan{}) - gob.Register([]*utils.LoadInstance{}) - gob.RegisterName("github.com/cgrates/cgrates/engine.RatingPlan", &engine.RatingPlan{}) - gob.RegisterName("github.com/cgrates/cgrates/engine.RatingProfile", &engine.RatingProfile{}) - gob.RegisterName("github.com/cgrates/cgrates/utils.DerivedChargers", &utils.DerivedChargers{}) - gob.Register(engine.AliasValues{}) -}*/ - // Starts rater and reports on chan func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneChan chan struct{}, - internalCdrStatSChan chan rpcclient.RpcClientConnection, internalHistorySChan chan rpcclient.RpcClientConnection, - internalPubSubSChan chan rpcclient.RpcClientConnection, internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection, + internalCdrStatSChan, internalStatSChan, internalHistorySChan, + internalPubSubSChan, internalUserSChan, internalAliaseSChan chan rpcclient.RpcClientConnection, serviceManager *servmanager.ServiceManager, server *utils.Server, dataDB engine.DataDB, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, stopHandled *bool, exitChan chan bool) { var waitTasks []chan struct{} @@ -53,14 +42,6 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, cacheTaskChan) go func() { defer close(cacheTaskChan) - - /*loadHist, err := dataDB.GetLoadHistory(1, true, utils.NonTransactional) - if err != nil || len(loadHist) == 0 { - utils.Logger.Info(fmt.Sprintf("could not get load history: %v (%v)", loadHist, err)) - cacheDoneChan <- struct{}{} - return - } - */ var dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rlIDs []string if cCfg, has := cfg.CacheConfig[utils.CacheDestinations]; !has || !cCfg.Precache { dstIDs = make([]string, 0) // Don't cache any @@ -115,7 +96,6 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC exitChan <- true return } - cacheDoneChan <- struct{}{} }() @@ -134,6 +114,21 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC } }() } + var stats *rpcclient.RpcClientPool + if len(cfg.RALsStatSConns) != 0 { // Connections to CDRStats + statsTaskChan := make(chan struct{}) + waitTasks = append(waitTasks, statsTaskChan) + go func() { + defer close(statsTaskChan) + stats, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.RALsStatSConns, internalStatSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to StatS, error: %s", err.Error())) + exitChan <- true + return + } + }() + } if len(cfg.RALsHistorySConns) != 0 { // Connection to HistoryS, histTaskChan := make(chan struct{}) waitTasks = append(waitTasks, histTaskChan) diff --git a/data/conf/samples/reslimiter/cgrates.json b/data/conf/samples/reslimiter/cgrates.json index 769d7bb18..2435d6a75 100644 --- a/data/conf/samples/reslimiter/cgrates.json +++ b/data/conf/samples/reslimiter/cgrates.json @@ -18,20 +18,20 @@ "rals": { "enabled": true, - "cdrstats_conns": [ + "stats_conns": [ {"address": "*internal"} ], }, -"cdrstats": { +"stats": { "enabled": true, }, "rls": { "enabled": true, - "cdrstats_conns": [ - {"address": "*internal"} + "stats_conns": [ + //{"address": "*internal"} ], "cache_dump_interval": "0s", "usage_ttl": "3h", diff --git a/data/tariffplans/tutorial/ResourceLimits.csv b/data/tariffplans/tutorial/ResourceLimits.csv index 8376db496..0080bc46f 100755 --- a/data/tariffplans/tutorial/ResourceLimits.csv +++ b/data/tariffplans/tutorial/ResourceLimits.csv @@ -3,4 +3,6 @@ ResGroup1,*string,Account,1001;1002,2014-07-29T15:00:00Z,1s,7,,true,true,20, ResGroup1,*string_prefix,Destination,10;20,,,,,,,, ResGroup1,*rsr_fields,,Subject(~^1.*1$);Destination(1002),,,,,,,, ResGroup2,*destinations,Destination,DST_FS,2014-07-29T15:00:00Z,3600s,8,SPECIAL_1002,true,true,10, -ResGroup3,*cdr_stats,,CDRST1:*min_ASR:34;CDRST_1001:*min_ASR:20,,,,,,,, \ No newline at end of file +ResGroup3,*string,Account,3001,2014-07-29T15:00:00Z,1s,3,,true,true,20, +#ResGroup3,*timings,SetupTime,PEAK,,,,,,,, +#ResGroup3,*cdr_stats,,CDRST1:*min_ASR:34;CDRST_1001:*min_ASR:20,,,,,,,, \ No newline at end of file diff --git a/stats/acd.go b/stats/acd.go index 361e56f59..f2a6cdfb9 100644 --- a/stats/acd.go +++ b/stats/acd.go @@ -42,6 +42,10 @@ func (acd *ACD) GetValue() (v interface{}) { return } +func (acd *ACD) GetFloat64Value() (v float64) { + return float64(engine.STATS_NA) +} + func (acd *ACD) AddEvent(ev engine.StatsEvent) (err error) { return } diff --git a/stats/asr.go b/stats/asr.go index 224c2609e..4b24c450f 100644 --- a/stats/asr.go +++ b/stats/asr.go @@ -36,6 +36,14 @@ type ASR struct { Count float64 } +func (asr *ASR) GetValue() (v interface{}) { + if asr.Count == 0 { + return float64(engine.STATS_NA) + } + return utils.Round((asr.Answered / asr.Count * 100), + config.CgrConfig().RoundingDecimals, utils.ROUNDING_MIDDLE) +} + func (asr *ASR) GetStringValue(fmtOpts string) (valStr string) { if asr.Count == 0 { return utils.NOT_AVAILABLE @@ -44,12 +52,8 @@ func (asr *ASR) GetStringValue(fmtOpts string) (valStr string) { return fmt.Sprintf("%v%%", val) // %v will automatically limit the number of decimals printed } -func (asr *ASR) GetValue() (v interface{}) { - if asr.Count == 0 { - return float64(engine.STATS_NA) - } - return utils.Round((asr.Answered / asr.Count * 100), - config.CgrConfig().RoundingDecimals, utils.ROUNDING_MIDDLE) +func (asr *ASR) GetFloat64Value() (val float64) { + return asr.GetValue().(float64) } func (asr *ASR) AddEvent(ev engine.StatsEvent) (err error) { diff --git a/stats/metric.go b/stats/metric.go index abd01a31b..51ee293dd 100644 --- a/stats/metric.go +++ b/stats/metric.go @@ -40,8 +40,9 @@ func NewStatsMetric(metricID string) (sm StatsMetric, err error) { // StatsMetric is the interface which a metric should implement type StatsMetric interface { - GetStringValue(fmtOpts string) (val string) GetValue() interface{} + GetStringValue(fmtOpts string) (val string) + GetFloat64Value() (val float64) AddEvent(ev engine.StatsEvent) error RemEvent(ev engine.StatsEvent) error GetMarshaled(ms engine.Marshaler) (vals []byte, err error)