Replaced InFieldSerator for stats metrics

This commit is contained in:
Trial97
2019-01-30 11:58:01 +02:00
committed by Dan Christian Bogos
parent 16ee033e26
commit 683430ef4c
9 changed files with 175 additions and 74 deletions

View File

@@ -155,10 +155,10 @@ func testV1STSGetStats(t *testing.T) {
utils.MetaTCD: utils.NOT_AVAILABLE,
utils.MetaACC: utils.NOT_AVAILABLE,
utils.MetaPDD: utils.NOT_AVAILABLE,
utils.ConcatenatedKey(utils.MetaSum, utils.Value): utils.NOT_AVAILABLE,
utils.ConcatenatedKey(utils.MetaAverage, utils.Value): utils.NOT_AVAILABLE,
utils.ConcatenatedKey(utils.MetaSum, utils.Usage): utils.NOT_AVAILABLE,
utils.ConcatenatedKey(utils.MetaAverage, utils.Usage): utils.NOT_AVAILABLE,
utils.StatsJoin(utils.MetaSum, utils.Value): utils.NOT_AVAILABLE,
utils.StatsJoin(utils.MetaAverage, utils.Value): utils.NOT_AVAILABLE,
utils.StatsJoin(utils.MetaSum, utils.Usage): utils.NOT_AVAILABLE,
utils.StatsJoin(utils.MetaAverage, utils.Usage): utils.NOT_AVAILABLE,
}
if err := stsV1Rpc.Call(utils.StatSv1GetQueueStringMetrics,
&utils.TenantID{Tenant: "cgrates.org", ID: expectedIDs[0]}, &metrics); err != nil {
@@ -194,10 +194,10 @@ func testV1STSProcessEvent(t *testing.T) {
utils.MetaTCD: utils.NOT_AVAILABLE,
utils.MetaACC: utils.NOT_AVAILABLE,
utils.MetaPDD: utils.NOT_AVAILABLE,
utils.ConcatenatedKey(utils.MetaSum, utils.Value): utils.NOT_AVAILABLE,
utils.ConcatenatedKey(utils.MetaAverage, utils.Value): utils.NOT_AVAILABLE,
utils.ConcatenatedKey(utils.MetaSum, utils.Usage): utils.NOT_AVAILABLE,
utils.ConcatenatedKey(utils.MetaAverage, utils.Usage): utils.NOT_AVAILABLE,
utils.StatsJoin(utils.MetaSum, utils.Value): utils.NOT_AVAILABLE,
utils.StatsJoin(utils.MetaAverage, utils.Value): utils.NOT_AVAILABLE,
utils.StatsJoin(utils.MetaSum, utils.Usage): utils.NOT_AVAILABLE,
utils.StatsJoin(utils.MetaAverage, utils.Usage): utils.NOT_AVAILABLE,
}
var metrics map[string]string
if err := stsV1Rpc.Call(utils.StatSv1GetQueueStringMetrics,
@@ -240,10 +240,10 @@ func testV1STSProcessEvent(t *testing.T) {
utils.MetaTCD: "3m0s",
utils.MetaTCC: "123",
utils.MetaPDD: "4s",
utils.ConcatenatedKey(utils.MetaSum, utils.Value): "0",
utils.ConcatenatedKey(utils.MetaAverage, utils.Value): utils.NOT_AVAILABLE,
utils.ConcatenatedKey(utils.MetaSum, utils.Usage): "180000000000",
utils.ConcatenatedKey(utils.MetaAverage, utils.Usage): "90000000000",
utils.StatsJoin(utils.MetaSum, utils.Value): "0",
utils.StatsJoin(utils.MetaAverage, utils.Value): utils.NOT_AVAILABLE,
utils.StatsJoin(utils.MetaSum, utils.Usage): "180000000000",
utils.StatsJoin(utils.MetaAverage, utils.Usage): "90000000000",
}
var metrics2 map[string]string
if err := stsV1Rpc.Call(utils.StatSv1GetQueueStringMetrics, &utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}, &metrics2); err != nil {
@@ -273,10 +273,10 @@ func testV1STSGetStatsAfterRestart(t *testing.T) {
utils.MetaTCD: "3m0s",
utils.MetaTCC: "123",
utils.MetaPDD: "4s",
utils.ConcatenatedKey(utils.MetaSum, utils.Value): "0",
utils.ConcatenatedKey(utils.MetaAverage, utils.Value): utils.NOT_AVAILABLE,
utils.ConcatenatedKey(utils.MetaSum, utils.Usage): "180000000000",
utils.ConcatenatedKey(utils.MetaAverage, utils.Usage): "90000000000",
utils.StatsJoin(utils.MetaSum, utils.Value): "0",
utils.StatsJoin(utils.MetaAverage, utils.Value): utils.NOT_AVAILABLE,
utils.StatsJoin(utils.MetaSum, utils.Usage): "180000000000",
utils.StatsJoin(utils.MetaAverage, utils.Usage): "90000000000",
}
var metrics2 map[string]string
if err := stsV1Rpc.Call(utils.StatSv1GetQueueStringMetrics, &utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}, &metrics2); err != nil {

View File

@@ -1625,7 +1625,7 @@ func (tps TpStatsS) AsTPStats() (result []*utils.TPStats) {
paramSplit := strings.Split(tp.Parameters, utils.INFIELD_SEP)
for _, param := range paramSplit {
metricmap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()][utils.ConcatenatedKey(metric, param)] = &utils.MetricWithParams{
MetricID: utils.ConcatenatedKey(metric, param), Parameters: param}
MetricID: utils.StatsJoin(metric, param), Parameters: param}
}
} else {
metricmap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()][metric] = &utils.MetricWithParams{

View File

@@ -21,7 +21,6 @@ package engine
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/config"
@@ -45,7 +44,7 @@ func NewStatMetric(metricID string, minItems int, extraParams string) (sm StatMe
utils.MetaSum: NewStatSum,
utils.MetaAverage: NewStatAverage,
}
metricType := strings.Split(metricID, utils.InInFieldSep)[0]
metricType := utils.SplitStats(metricID)[0]
if _, has := metrics[metricType]; !has {
return nil, fmt.Errorf("unsupported metric type <%s>", metricType)
}

View File

@@ -136,7 +136,7 @@ func (vers Versions) Compare(curent Versions, storType string, isDataDB bool) st
func CurrentDataDBVersions() Versions {
return Versions{
utils.StatS: 2,
utils.StatS: 3,
utils.Accounts: 3,
utils.Actions: 2,
utils.ActionTriggers: 2,

View File

@@ -59,12 +59,34 @@ type v1Stat struct {
type v1Stats []*v1Stat
func (m *Migrator) moveStatQueueProfile() (err error) {
//StatQueueProfile
tenant := config.CgrConfig().GeneralCfg().DefaultTenant
var ids []string
if ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueueProfilePrefix); err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.StatQueueProfilePrefix+tenant+":")
sgs, err := m.dmIN.DataManager().GetStatQueueProfile(tenant, idg, false, false, utils.NonTransactional)
if err != nil {
return err
}
if sgs == nil || m.dryRun {
continue
}
if err = m.dmOut.DataManager().SetStatQueueProfile(sgs, true); err != nil {
return err
}
}
return
}
func (m *Migrator) migrateCurrentStats() (err error) {
var ids []string
tenant := config.CgrConfig().GeneralCfg().DefaultTenant
//StatQueue
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueuePrefix)
if err != nil {
if ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueuePrefix); err != nil {
return err
}
for _, id := range ids {
@@ -74,36 +96,16 @@ func (m *Migrator) migrateCurrentStats() (err error) {
return err
}
if sgs != nil {
if m.dryRun != true {
if err := m.dmOut.DataManager().SetStatQueue(sgs); err != nil {
return err
}
m.stats[utils.StatS] += 1
}
if sgs == nil || m.dryRun {
continue
}
}
//StatQueueProfile
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueueProfilePrefix)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.StatQueueProfilePrefix+tenant+":")
sgs, err := m.dmIN.DataManager().GetStatQueueProfile(tenant, idg, false, false, utils.NonTransactional)
if err != nil {
if err := m.dmOut.DataManager().SetStatQueue(sgs); err != nil {
return err
}
if sgs != nil {
if m.dryRun != true {
if err := m.dmOut.DataManager().SetStatQueueProfile(sgs, true); err != nil {
return err
}
}
}
m.stats[utils.StatS] += 1
}
return
return m.moveStatQueueProfile()
}
func (m *Migrator) migrateV1CDRSTATS() (err error) {
@@ -129,29 +131,86 @@ func (m *Migrator) migrateV1CDRSTATS() (err error) {
if err != nil {
return err
}
if !m.dryRun {
if err := m.dmOut.DataManager().SetFilter(filter); err != nil {
return err
}
if err := m.dmOut.DataManager().SetStatQueue(sq); err != nil {
return err
}
if err := m.dmOut.DataManager().SetStatQueueProfile(sts, true); err != nil {
return err
}
m.stats[utils.StatS] += 1
if m.dryRun {
continue
}
if err := m.dmOut.DataManager().SetFilter(filter); err != nil {
return err
}
if err := m.dmOut.DataManager().SetStatQueue(remakeQueue(sq)); err != nil {
return err
}
if err := m.dmOut.DataManager().SetStatQueueProfile(sts, true); err != nil {
return err
}
m.stats[utils.StatS] += 1
}
}
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.StatS: engine.CurrentDataDBVersions()[utils.StatS]}
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Stats version into dataDB", err.Error()))
if m.dryRun {
return
}
// All done, update version wtih current one
vrs := engine.Versions{utils.StatS: engine.CurrentDataDBVersions()[utils.StatS]}
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Stats version into dataDB", err.Error()))
}
return
}
func remakeQueue(sq *engine.StatQueue) (out *engine.StatQueue) {
out = &engine.StatQueue{
Tenant: sq.Tenant,
ID: sq.ID,
SQItems: sq.SQItems,
SQMetrics: make(map[string]engine.StatMetric),
MinItems: sq.MinItems,
}
for mId, metric := range sq.SQMetrics {
id := utils.StatsJoin(utils.SplitConcatenatedKey(mId)...)
out.SQMetrics[id] = metric
}
return
}
func (m *Migrator) migrateV2Stats() (err error) {
var ids []string
tenant := config.CgrConfig().GeneralCfg().DefaultTenant
//StatQueue
if ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueuePrefix); err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.StatQueuePrefix+tenant+":")
sgs, err := m.dmIN.DataManager().GetStatQueue(tenant, idg, false, false, utils.NonTransactional)
if err != nil {
return err
}
if sgs == nil || m.dryRun {
continue
}
if err = m.dmOut.DataManager().SetStatQueue(remakeQueue(sgs)); err != nil {
return err
}
m.stats[utils.StatS] += 1
}
if err = m.moveStatQueueProfile(); err != nil {
return err
}
if m.dryRun {
return
}
// All done, update version wtih current one
vrs := engine.Versions{utils.StatS: engine.CurrentDataDBVersions()[utils.StatS]}
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Stats version into dataDB", err.Error()))
}
return
}
@@ -173,17 +232,14 @@ func (m *Migrator) migrateStats() (err error) {
}
switch vrs[utils.StatS] {
case 1:
if err := m.migrateV1CDRSTATS(); err != nil {
return err
}
return m.migrateV1CDRSTATS()
case 2:
return m.migrateV2Stats()
case current[utils.StatS]:
if m.sameDataDB {
return
}
if err := m.migrateCurrentStats(); err != nil {
return err
}
return
return m.migrateCurrentStats()
}
return
}

View File

@@ -144,3 +144,40 @@ func TestV1StatsAsStats(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", filter, fltr)
}
}
func TestRemakeQueue(t *testing.T) {
sq := &engine.StatQueue{
Tenant: "cgrates.org",
ID: "StatsID",
SQItems: []struct {
EventID string
ExpiryTime *time.Time
}{
{
EventID: "ev1",
},
},
SQMetrics: map[string]engine.StatMetric{
"*tcc": nil,
"*sum:Usage": nil,
"*avreage:Cost": nil,
},
MinItems: 2,
}
expected := &engine.StatQueue{
Tenant: sq.Tenant,
ID: sq.ID,
SQItems: sq.SQItems,
SQMetrics: map[string]engine.StatMetric{
"*tcc": nil,
"*sum#Usage": nil,
"*avreage#Cost": nil,
},
MinItems: sq.MinItems,
}
if rply := remakeQueue(sq); !reflect.DeepEqual(expected, rply) {
t.Errorf("Expecting: %+v, received: %+v", expected, rply)
}
return
}

View File

@@ -154,7 +154,7 @@ func testTpStatsITCheckData(t *testing.T) {
if err != nil {
t.Error("Error when getting TpStat ", err.Error())
}
tpStats[0].Metrics[0].MetricID = "*sum:Param1" //add parametrics to metricID to use multiple parameters for same metric
tpStats[0].Metrics[0].MetricID = "*sum#Param1" //add parametrics to metricID to use multiple parameters for same metric
if !reflect.DeepEqual(tpStats[0], result[0]) {
t.Errorf("Expecting: %+v, received: %+v",
utils.ToJSON(tpStats[0]), utils.ToJSON(result[0]))

View File

@@ -128,6 +128,7 @@ const (
ZERO = "*zero"
ASAP = "*asap"
USERS = "*users"
STATS_CHAR = "#"
COMMENT_CHAR = '#'
CSV_SEP = ','
FALLBACK_SEP = ';'

View File

@@ -344,6 +344,14 @@ func SplitConcatenatedKey(key string) []string {
return strings.Split(key, CONCATENATED_KEY_SEP)
}
func StatsJoin(keyVals ...string) string {
return strings.Join(keyVals, STATS_CHAR)
}
func SplitStats(key string) []string {
return strings.Split(key, STATS_CHAR)
}
func LCRKey(direction, tenant, category, account, subject string) string {
return ConcatenatedKey(direction, tenant, category, account, subject)
}