From 683430ef4c94d41a36265baedc57f3c29ace469f Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 30 Jan 2019 11:58:01 +0200 Subject: [PATCH] Replaced InFieldSerator for stats metrics --- apier/v1/stats_it_test.go | 32 +++---- engine/model_helpers.go | 2 +- engine/statmetrics.go | 3 +- engine/version.go | 2 +- migrator/stats.go | 162 +++++++++++++++++++++++------------ migrator/stats_test.go | 37 ++++++++ migrator/tp_stats_it_test.go | 2 +- utils/consts.go | 1 + utils/coreutils.go | 8 ++ 9 files changed, 175 insertions(+), 74 deletions(-) diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go index 50ae471ab..13fa75924 100644 --- a/apier/v1/stats_it_test.go +++ b/apier/v1/stats_it_test.go @@ -155,10 +155,10 @@ func testV1STSGetStats(t *testing.T) { utils.MetaTCD: utils.NOT_AVAILABLE, utils.MetaACC: utils.NOT_AVAILABLE, utils.MetaPDD: utils.NOT_AVAILABLE, - utils.ConcatenatedKey(utils.MetaSum, utils.Value): utils.NOT_AVAILABLE, - utils.ConcatenatedKey(utils.MetaAverage, utils.Value): utils.NOT_AVAILABLE, - utils.ConcatenatedKey(utils.MetaSum, utils.Usage): utils.NOT_AVAILABLE, - utils.ConcatenatedKey(utils.MetaAverage, utils.Usage): utils.NOT_AVAILABLE, + utils.StatsJoin(utils.MetaSum, utils.Value): utils.NOT_AVAILABLE, + utils.StatsJoin(utils.MetaAverage, utils.Value): utils.NOT_AVAILABLE, + utils.StatsJoin(utils.MetaSum, utils.Usage): utils.NOT_AVAILABLE, + utils.StatsJoin(utils.MetaAverage, utils.Usage): utils.NOT_AVAILABLE, } if err := stsV1Rpc.Call(utils.StatSv1GetQueueStringMetrics, &utils.TenantID{Tenant: "cgrates.org", ID: expectedIDs[0]}, &metrics); err != nil { @@ -194,10 +194,10 @@ func testV1STSProcessEvent(t *testing.T) { utils.MetaTCD: utils.NOT_AVAILABLE, utils.MetaACC: utils.NOT_AVAILABLE, utils.MetaPDD: utils.NOT_AVAILABLE, - utils.ConcatenatedKey(utils.MetaSum, utils.Value): utils.NOT_AVAILABLE, - utils.ConcatenatedKey(utils.MetaAverage, utils.Value): utils.NOT_AVAILABLE, - utils.ConcatenatedKey(utils.MetaSum, utils.Usage): utils.NOT_AVAILABLE, - utils.ConcatenatedKey(utils.MetaAverage, utils.Usage): utils.NOT_AVAILABLE, + utils.StatsJoin(utils.MetaSum, utils.Value): utils.NOT_AVAILABLE, + utils.StatsJoin(utils.MetaAverage, utils.Value): utils.NOT_AVAILABLE, + utils.StatsJoin(utils.MetaSum, utils.Usage): utils.NOT_AVAILABLE, + utils.StatsJoin(utils.MetaAverage, utils.Usage): utils.NOT_AVAILABLE, } var metrics map[string]string if err := stsV1Rpc.Call(utils.StatSv1GetQueueStringMetrics, @@ -240,10 +240,10 @@ func testV1STSProcessEvent(t *testing.T) { utils.MetaTCD: "3m0s", utils.MetaTCC: "123", utils.MetaPDD: "4s", - utils.ConcatenatedKey(utils.MetaSum, utils.Value): "0", - utils.ConcatenatedKey(utils.MetaAverage, utils.Value): utils.NOT_AVAILABLE, - utils.ConcatenatedKey(utils.MetaSum, utils.Usage): "180000000000", - utils.ConcatenatedKey(utils.MetaAverage, utils.Usage): "90000000000", + utils.StatsJoin(utils.MetaSum, utils.Value): "0", + utils.StatsJoin(utils.MetaAverage, utils.Value): utils.NOT_AVAILABLE, + utils.StatsJoin(utils.MetaSum, utils.Usage): "180000000000", + utils.StatsJoin(utils.MetaAverage, utils.Usage): "90000000000", } var metrics2 map[string]string if err := stsV1Rpc.Call(utils.StatSv1GetQueueStringMetrics, &utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}, &metrics2); err != nil { @@ -273,10 +273,10 @@ func testV1STSGetStatsAfterRestart(t *testing.T) { utils.MetaTCD: "3m0s", utils.MetaTCC: "123", utils.MetaPDD: "4s", - utils.ConcatenatedKey(utils.MetaSum, utils.Value): "0", - utils.ConcatenatedKey(utils.MetaAverage, utils.Value): utils.NOT_AVAILABLE, - utils.ConcatenatedKey(utils.MetaSum, utils.Usage): "180000000000", - utils.ConcatenatedKey(utils.MetaAverage, utils.Usage): "90000000000", + utils.StatsJoin(utils.MetaSum, utils.Value): "0", + utils.StatsJoin(utils.MetaAverage, utils.Value): utils.NOT_AVAILABLE, + utils.StatsJoin(utils.MetaSum, utils.Usage): "180000000000", + utils.StatsJoin(utils.MetaAverage, utils.Usage): "90000000000", } var metrics2 map[string]string if err := stsV1Rpc.Call(utils.StatSv1GetQueueStringMetrics, &utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}, &metrics2); err != nil { diff --git a/engine/model_helpers.go b/engine/model_helpers.go index e380d6a32..0181c3005 100644 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -1625,7 +1625,7 @@ func (tps TpStatsS) AsTPStats() (result []*utils.TPStats) { paramSplit := strings.Split(tp.Parameters, utils.INFIELD_SEP) for _, param := range paramSplit { metricmap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()][utils.ConcatenatedKey(metric, param)] = &utils.MetricWithParams{ - MetricID: utils.ConcatenatedKey(metric, param), Parameters: param} + MetricID: utils.StatsJoin(metric, param), Parameters: param} } } else { metricmap[(&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()][metric] = &utils.MetricWithParams{ diff --git a/engine/statmetrics.go b/engine/statmetrics.go index 4b64f3105..65dad755f 100644 --- a/engine/statmetrics.go +++ b/engine/statmetrics.go @@ -21,7 +21,6 @@ package engine import ( "fmt" "strconv" - "strings" "time" "github.com/cgrates/cgrates/config" @@ -45,7 +44,7 @@ func NewStatMetric(metricID string, minItems int, extraParams string) (sm StatMe utils.MetaSum: NewStatSum, utils.MetaAverage: NewStatAverage, } - metricType := strings.Split(metricID, utils.InInFieldSep)[0] + metricType := utils.SplitStats(metricID)[0] if _, has := metrics[metricType]; !has { return nil, fmt.Errorf("unsupported metric type <%s>", metricType) } diff --git a/engine/version.go b/engine/version.go index 62450100d..7cf3217bb 100644 --- a/engine/version.go +++ b/engine/version.go @@ -136,7 +136,7 @@ func (vers Versions) Compare(curent Versions, storType string, isDataDB bool) st func CurrentDataDBVersions() Versions { return Versions{ - utils.StatS: 2, + utils.StatS: 3, utils.Accounts: 3, utils.Actions: 2, utils.ActionTriggers: 2, diff --git a/migrator/stats.go b/migrator/stats.go index 7966d0804..0de565582 100644 --- a/migrator/stats.go +++ b/migrator/stats.go @@ -59,12 +59,34 @@ type v1Stat struct { type v1Stats []*v1Stat +func (m *Migrator) moveStatQueueProfile() (err error) { + //StatQueueProfile + tenant := config.CgrConfig().GeneralCfg().DefaultTenant + var ids []string + if ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueueProfilePrefix); err != nil { + return err + } + for _, id := range ids { + idg := strings.TrimPrefix(id, utils.StatQueueProfilePrefix+tenant+":") + sgs, err := m.dmIN.DataManager().GetStatQueueProfile(tenant, idg, false, false, utils.NonTransactional) + if err != nil { + return err + } + if sgs == nil || m.dryRun { + continue + } + if err = m.dmOut.DataManager().SetStatQueueProfile(sgs, true); err != nil { + return err + } + } + return +} + func (m *Migrator) migrateCurrentStats() (err error) { var ids []string tenant := config.CgrConfig().GeneralCfg().DefaultTenant //StatQueue - ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueuePrefix) - if err != nil { + if ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueuePrefix); err != nil { return err } for _, id := range ids { @@ -74,36 +96,16 @@ func (m *Migrator) migrateCurrentStats() (err error) { return err } - if sgs != nil { - if m.dryRun != true { - if err := m.dmOut.DataManager().SetStatQueue(sgs); err != nil { - return err - } - m.stats[utils.StatS] += 1 - } + if sgs == nil || m.dryRun { + continue } - } - //StatQueueProfile - ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueueProfilePrefix) - if err != nil { - return err - } - for _, id := range ids { - idg := strings.TrimPrefix(id, utils.StatQueueProfilePrefix+tenant+":") - sgs, err := m.dmIN.DataManager().GetStatQueueProfile(tenant, idg, false, false, utils.NonTransactional) - if err != nil { + if err := m.dmOut.DataManager().SetStatQueue(sgs); err != nil { return err } - if sgs != nil { - if m.dryRun != true { - if err := m.dmOut.DataManager().SetStatQueueProfile(sgs, true); err != nil { - return err - } - } - } + m.stats[utils.StatS] += 1 } - return + return m.moveStatQueueProfile() } func (m *Migrator) migrateV1CDRSTATS() (err error) { @@ -129,29 +131,86 @@ func (m *Migrator) migrateV1CDRSTATS() (err error) { if err != nil { return err } - if !m.dryRun { - if err := m.dmOut.DataManager().SetFilter(filter); err != nil { - return err - } - if err := m.dmOut.DataManager().SetStatQueue(sq); err != nil { - return err - } - if err := m.dmOut.DataManager().SetStatQueueProfile(sts, true); err != nil { - return err - } - m.stats[utils.StatS] += 1 + if m.dryRun { + continue } + if err := m.dmOut.DataManager().SetFilter(filter); err != nil { + return err + } + if err := m.dmOut.DataManager().SetStatQueue(remakeQueue(sq)); err != nil { + return err + } + if err := m.dmOut.DataManager().SetStatQueueProfile(sts, true); err != nil { + return err + } + m.stats[utils.StatS] += 1 } } - if m.dryRun != true { - // All done, update version wtih current one - vrs := engine.Versions{utils.StatS: engine.CurrentDataDBVersions()[utils.StatS]} - if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating Stats version into dataDB", err.Error())) + if m.dryRun { + return + } + // All done, update version wtih current one + vrs := engine.Versions{utils.StatS: engine.CurrentDataDBVersions()[utils.StatS]} + if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when updating Stats version into dataDB", err.Error())) + } + return +} + +func remakeQueue(sq *engine.StatQueue) (out *engine.StatQueue) { + out = &engine.StatQueue{ + Tenant: sq.Tenant, + ID: sq.ID, + SQItems: sq.SQItems, + SQMetrics: make(map[string]engine.StatMetric), + MinItems: sq.MinItems, + } + for mId, metric := range sq.SQMetrics { + id := utils.StatsJoin(utils.SplitConcatenatedKey(mId)...) + out.SQMetrics[id] = metric + } + return +} + +func (m *Migrator) migrateV2Stats() (err error) { + var ids []string + tenant := config.CgrConfig().GeneralCfg().DefaultTenant + //StatQueue + if ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueuePrefix); err != nil { + return err + } + for _, id := range ids { + idg := strings.TrimPrefix(id, utils.StatQueuePrefix+tenant+":") + sgs, err := m.dmIN.DataManager().GetStatQueue(tenant, idg, false, false, utils.NonTransactional) + if err != nil { + + return err } + if sgs == nil || m.dryRun { + continue + } + if err = m.dmOut.DataManager().SetStatQueue(remakeQueue(sgs)); err != nil { + return err + } + m.stats[utils.StatS] += 1 + } + + if err = m.moveStatQueueProfile(); err != nil { + return err + } + if m.dryRun { + return + } + // All done, update version wtih current one + vrs := engine.Versions{utils.StatS: engine.CurrentDataDBVersions()[utils.StatS]} + if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when updating Stats version into dataDB", err.Error())) } return } @@ -173,17 +232,14 @@ func (m *Migrator) migrateStats() (err error) { } switch vrs[utils.StatS] { case 1: - if err := m.migrateV1CDRSTATS(); err != nil { - return err - } + return m.migrateV1CDRSTATS() + case 2: + return m.migrateV2Stats() case current[utils.StatS]: if m.sameDataDB { return } - if err := m.migrateCurrentStats(); err != nil { - return err - } - return + return m.migrateCurrentStats() } return } diff --git a/migrator/stats_test.go b/migrator/stats_test.go index cb48be9e4..f4c15b17a 100644 --- a/migrator/stats_test.go +++ b/migrator/stats_test.go @@ -144,3 +144,40 @@ func TestV1StatsAsStats(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", filter, fltr) } } + +func TestRemakeQueue(t *testing.T) { + sq := &engine.StatQueue{ + Tenant: "cgrates.org", + ID: "StatsID", + SQItems: []struct { + EventID string + ExpiryTime *time.Time + }{ + { + EventID: "ev1", + }, + }, + SQMetrics: map[string]engine.StatMetric{ + "*tcc": nil, + "*sum:Usage": nil, + "*avreage:Cost": nil, + }, + MinItems: 2, + } + expected := &engine.StatQueue{ + Tenant: sq.Tenant, + ID: sq.ID, + SQItems: sq.SQItems, + SQMetrics: map[string]engine.StatMetric{ + "*tcc": nil, + "*sum#Usage": nil, + "*avreage#Cost": nil, + }, + MinItems: sq.MinItems, + } + + if rply := remakeQueue(sq); !reflect.DeepEqual(expected, rply) { + t.Errorf("Expecting: %+v, received: %+v", expected, rply) + } + return +} diff --git a/migrator/tp_stats_it_test.go b/migrator/tp_stats_it_test.go index b3b488423..8c682670b 100644 --- a/migrator/tp_stats_it_test.go +++ b/migrator/tp_stats_it_test.go @@ -154,7 +154,7 @@ func testTpStatsITCheckData(t *testing.T) { if err != nil { t.Error("Error when getting TpStat ", err.Error()) } - tpStats[0].Metrics[0].MetricID = "*sum:Param1" //add parametrics to metricID to use multiple parameters for same metric + tpStats[0].Metrics[0].MetricID = "*sum#Param1" //add parametrics to metricID to use multiple parameters for same metric if !reflect.DeepEqual(tpStats[0], result[0]) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(tpStats[0]), utils.ToJSON(result[0])) diff --git a/utils/consts.go b/utils/consts.go index 534c6a00f..ec082e0a6 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -128,6 +128,7 @@ const ( ZERO = "*zero" ASAP = "*asap" USERS = "*users" + STATS_CHAR = "#" COMMENT_CHAR = '#' CSV_SEP = ',' FALLBACK_SEP = ';' diff --git a/utils/coreutils.go b/utils/coreutils.go index 79e1581f9..ce22005b9 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -344,6 +344,14 @@ func SplitConcatenatedKey(key string) []string { return strings.Split(key, CONCATENATED_KEY_SEP) } +func StatsJoin(keyVals ...string) string { + return strings.Join(keyVals, STATS_CHAR) +} + +func SplitStats(key string) []string { + return strings.Split(key, STATS_CHAR) +} + func LCRKey(direction, tenant, category, account, subject string) string { return ConcatenatedKey(direction, tenant, category, account, subject) }