Refactored and optimized code

This commit is contained in:
edwardro22
2017-12-27 12:33:20 +02:00
committed by Dan Christian Bogos
parent bd8ba22ca7
commit 5cf3cd59ec
8 changed files with 83 additions and 129 deletions

View File

@@ -47,7 +47,6 @@ func (sqp *StatQueueProfile) TenantID() string {
// NewStoredStatQueue initiates a StoredStatQueue out of StatQueue
func NewStoredStatQueue(sq *StatQueue, ms Marshaler) (sSQ *StoredStatQueue, err error) {
marshaledMetrics := make(map[string]map[string][]byte)
sSQ = &StoredStatQueue{
Tenant: sq.Tenant,
ID: sq.ID,
@@ -55,27 +54,19 @@ func NewStoredStatQueue(sq *StatQueue, ms Marshaler) (sSQ *StoredStatQueue, err
EventID string
ExpiryTime *time.Time
}, len(sq.SQItems)),
MinItems: sq.MinItems,
SQMetrics: make(map[string][]byte, len(sq.SQMetrics)),
MinItems: sq.MinItems,
}
for i, sqItm := range sq.SQItems {
sSQ.SQItems[i] = sqItm
}
for metricID, _ := range sq.SQMetrics {
for parameter, metric := range sq.SQMetrics[metricID] {
if marshaled, err := metric.Marshal(ms); err != nil {
return nil, err
} else {
if _, hasIt := marshaledMetrics[metricID]; !hasIt {
marshaledMetrics[metricID] = make(map[string][]byte)
}
if _, hasIt := marshaledMetrics[metricID][parameter]; !hasIt {
marshaledMetrics[metricID][parameter] = make([]byte, len(marshaled))
}
marshaledMetrics[metricID][parameter] = marshaled
}
for metricID, metric := range sq.SQMetrics {
if marshaled, err := metric.Marshal(ms); err != nil {
return nil, err
} else {
sSQ.SQMetrics[metricID] = marshaled
}
}
sSQ.SQMetrics = marshaledMetrics
return
}
@@ -87,7 +78,7 @@ type StoredStatQueue struct {
EventID string // Bounded to the original utils.CGREvent
ExpiryTime *time.Time // Used to auto-expire events
}
SQMetrics map[string]map[string][]byte
SQMetrics map[string][]byte
MinItems int
}
@@ -98,7 +89,6 @@ func (ssq *StoredStatQueue) SqID() string {
// AsStatQueue converts into StatQueue unmarshaling SQMetrics
func (ssq *StoredStatQueue) AsStatQueue(ms Marshaler) (sq *StatQueue, err error) {
SQMetrics := make(map[string]map[string]StatMetric)
sq = &StatQueue{
Tenant: ssq.Tenant,
ID: ssq.ID,
@@ -106,26 +96,21 @@ func (ssq *StoredStatQueue) AsStatQueue(ms Marshaler) (sq *StatQueue, err error)
EventID string
ExpiryTime *time.Time
}, len(ssq.SQItems)),
MinItems: ssq.MinItems,
SQMetrics: make(map[string]StatMetric, len(ssq.SQMetrics)),
MinItems: ssq.MinItems,
}
for i, sqItm := range ssq.SQItems {
sq.SQItems[i] = sqItm
}
for metricID, _ := range ssq.SQMetrics {
for parameter, marshaled := range ssq.SQMetrics[metricID] {
if metric, err := NewStatMetric(metricID, ssq.MinItems, parameter); err != nil {
return nil, err
} else if err := metric.LoadMarshaled(ms, marshaled); err != nil {
return nil, err
} else {
if _, hasIt := SQMetrics[metricID]; !hasIt {
SQMetrics[metricID] = make(map[string]StatMetric)
}
SQMetrics[metricID][parameter] = metric
}
for metricID, marshaled := range ssq.SQMetrics {
if metric, err := NewStatMetric(metricID, ssq.MinItems, ""); err != nil {
return nil, err
} else if err := metric.LoadMarshaled(ms, marshaled); err != nil {
return nil, err
} else {
sq.SQMetrics[metricID] = metric
}
}
sq.SQMetrics = SQMetrics
return
}
@@ -137,7 +122,7 @@ type StatQueue struct {
EventID string // Bounded to the original utils.CGREvent
ExpiryTime *time.Time // Used to auto-expire events
}
SQMetrics map[string]map[string]StatMetric
SQMetrics map[string]StatMetric
MinItems int
sqPrfl *StatQueueProfile
dirty *bool // needs save
@@ -159,11 +144,9 @@ func (sq *StatQueue) ProcessEvent(ev *utils.CGREvent) (err error) {
// remStatEvent removes an event from metrics
func (sq *StatQueue) remEventWithID(evTenantID string) {
for metricID, _ := range sq.SQMetrics {
for _, metric := range sq.SQMetrics[metricID] {
if err := metric.RemEvent(evTenantID); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, remove eventID: %s, error: %s", metricID, evTenantID, err.Error()))
}
for metricID, metric := range sq.SQMetrics {
if err := metric.RemEvent(evTenantID); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, remove eventID: %s, error: %s", metricID, evTenantID, err.Error()))
}
}
}
@@ -201,12 +184,10 @@ func (sq *StatQueue) remOnQueueLength() {
// addStatEvent computes metrics for an event
func (sq *StatQueue) addStatEvent(ev *utils.CGREvent) {
for metricID, _ := range sq.SQMetrics {
for _, metric := range sq.SQMetrics[metricID] {
if err := metric.AddEvent(ev); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, add eventID: %s, error: %s",
metricID, ev.TenantID(), err.Error()))
}
for metricID, metric := range sq.SQMetrics {
if err := metric.AddEvent(ev); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, add eventID: %s, error: %s",
metricID, ev.TenantID(), err.Error()))
}
}
}

View File

@@ -48,20 +48,18 @@ func TestStatQueuesSort(t *testing.T) {
func TestStatRemEventWithID(t *testing.T) {
sq = &StatQueue{
SQMetrics: map[string]map[string]StatMetric{
utils.MetaASR: map[string]StatMetric{
"": &StatASR{
Answered: 1,
Count: 2,
Events: map[string]bool{
"cgrates.org:TestRemEventWithID_1": true,
"cgrates.org:TestRemEventWithID_2": false,
},
SQMetrics: map[string]StatMetric{
utils.MetaASR: &StatASR{
Answered: 1,
Count: 2,
Events: map[string]bool{
"cgrates.org:TestRemEventWithID_1": true,
"cgrates.org:TestRemEventWithID_2": false,
},
},
},
}
asrMetric := sq.SQMetrics[utils.MetaASR][""].(*StatASR)
asrMetric := sq.SQMetrics[utils.MetaASR].(*StatASR)
if asr := asrMetric.GetFloat64Value(); asr != 50 {
t.Errorf("received asrMetric: %v", asrMetric)
}
@@ -93,16 +91,14 @@ func TestStatRemEventWithID(t *testing.T) {
func TestStatRemExpired(t *testing.T) {
sq = &StatQueue{
SQMetrics: map[string]map[string]StatMetric{
utils.MetaASR: map[string]StatMetric{
"": &StatASR{
Answered: 2,
Count: 3,
Events: map[string]bool{
"cgrates.org:TestStatRemExpired_1": true,
"cgrates.org:TestStatRemExpired_2": false,
"cgrates.org:TestStatRemExpired_3": true,
},
SQMetrics: map[string]StatMetric{
utils.MetaASR: &StatASR{
Answered: 2,
Count: 3,
Events: map[string]bool{
"cgrates.org:TestStatRemExpired_1": true,
"cgrates.org:TestStatRemExpired_2": false,
"cgrates.org:TestStatRemExpired_3": true,
},
},
},
@@ -118,7 +114,7 @@ func TestStatRemExpired(t *testing.T) {
{"cgrates.org:TestStatRemExpired_3", utils.TimePointer(time.Now().Add(time.Duration(time.Minute)))},
},
}
asrMetric := sq.SQMetrics[utils.MetaASR][""].(*StatASR)
asrMetric := sq.SQMetrics[utils.MetaASR].(*StatASR)
if asr := asrMetric.GetFloat64Value(); asr != 66.66667 {
t.Errorf("received asrMetric: %v", asrMetric)
}
@@ -179,19 +175,17 @@ func TestStatRemOnQueueLength(t *testing.T) {
func TestStatAddStatEvent(t *testing.T) {
sq = &StatQueue{
SQMetrics: map[string]map[string]StatMetric{
utils.MetaASR: map[string]StatMetric{
"": &StatASR{
Answered: 1,
Count: 1,
Events: map[string]bool{
"cgrates.org:TestStatRemExpired_1": true,
},
SQMetrics: map[string]StatMetric{
utils.MetaASR: &StatASR{
Answered: 1,
Count: 1,
Events: map[string]bool{
"cgrates.org:TestStatRemExpired_1": true,
},
},
},
}
asrMetric := sq.SQMetrics[utils.MetaASR][""].(*StatASR)
asrMetric := sq.SQMetrics[utils.MetaASR].(*StatASR)
if asr := asrMetric.GetFloat64Value(); asr != 100 {
t.Errorf("received ASR: %v", asr)
}

View File

@@ -917,7 +917,9 @@ func TestTPStatsAsTPStats(t *testing.T) {
} else if !(reflect.DeepEqual(eTPs[1].FilterIDs, rcvTPs[1].FilterIDs) && reflect.DeepEqual(eTPs[0].FilterIDs, rcvTPs[0].FilterIDs)) {
t.Errorf("\nExpecting:\n%+v\nReceived:\n%+v", utils.ToIJSON(eTPs[0].FilterIDs), utils.ToIJSON(rcvTPs[0].FilterIDs))
} else if len(utils.ToIJSON(eTPs[0].Metrics)) != len(utils.ToIJSON(rcvTPs[0].Metrics)) &&
len(utils.ToIJSON(eTPs[1].Metrics)) != len(utils.ToIJSON(rcvTPs[1].Metrics)) {
len(utils.ToIJSON(eTPs[1].Metrics)) != len(utils.ToIJSON(rcvTPs[1].Metrics)) &&
len(utils.ToIJSON(eTPs[1].Metrics)) != len(utils.ToIJSON(rcvTPs[0].Metrics)) &&
len(utils.ToIJSON(eTPs[0].Metrics)) != len(utils.ToIJSON(rcvTPs[1].Metrics)) {
t.Errorf("\nExpecting:\n%+v\nReceived:\n%+v", utils.ToIJSON(eTPs[0].Metrics), utils.ToIJSON(rcvTPs[0].Metrics))
}
if !(reflect.DeepEqual(eTPs[1].TPid, rcvTPs[1].TPid) && reflect.DeepEqual(eTPs[0].TPid, rcvTPs[0].TPid)) {
@@ -927,7 +929,9 @@ func TestTPStatsAsTPStats(t *testing.T) {
} else if !(reflect.DeepEqual(eTPs[1].FilterIDs, rcvTPs[1].FilterIDs) && reflect.DeepEqual(eTPs[0].FilterIDs, rcvTPs[0].FilterIDs)) {
t.Errorf("\nExpecting:\n%+v\nReceived:\n%+v", utils.ToIJSON(eTPs[1].FilterIDs), utils.ToIJSON(rcvTPs[1].FilterIDs))
} else if len(utils.ToIJSON(eTPs[0].Metrics)) != len(utils.ToIJSON(rcvTPs[0].Metrics)) &&
len(utils.ToIJSON(eTPs[0].Metrics)) != len(utils.ToIJSON(rcvTPs[0].Metrics)) {
len(utils.ToIJSON(eTPs[1].Metrics)) != len(utils.ToIJSON(rcvTPs[1].Metrics)) &&
len(utils.ToIJSON(eTPs[1].Metrics)) != len(utils.ToIJSON(rcvTPs[0].Metrics)) &&
len(utils.ToIJSON(eTPs[0].Metrics)) != len(utils.ToIJSON(rcvTPs[1].Metrics)) {
t.Errorf("\nExpecting:\n%+v\nReceived:\n%+v", utils.ToIJSON(eTPs[1].Metrics), utils.ToIJSON(rcvTPs[1].Metrics))
}
}

View File

@@ -649,7 +649,6 @@ func NewStatSum(minItems int, extraParams string) (StatMetric, error) {
type StatSum struct {
Sum float64
Count float64
Events map[string]float64 // map[EventTenantID]Cost
MinItems int
FieldName string
@@ -659,7 +658,7 @@ type StatSum struct {
// getValue returns tcd.val
func (sum *StatSum) getValue() float64 {
if sum.val == nil {
if (sum.MinItems > 0 && len(sum.Events) < sum.MinItems) || (sum.Count == 0) {
if len(sum.Events) == 0 || len(sum.Events) < sum.MinItems {
sum.val = utils.Float64Pointer(STATS_NA)
} else {
sum.val = utils.Float64Pointer(utils.Round(sum.Sum,
@@ -688,19 +687,14 @@ func (sum *StatSum) GetFloat64Value() (v float64) {
func (sum *StatSum) AddEvent(ev *utils.CGREvent) (err error) {
var value float64
if at, err := ev.FieldAsTime(utils.AnswerTime, config.CgrConfig().DefaultTimezone); err != nil {
if val, err := ev.FieldAsFloat64(sum.FieldName); err != nil &&
err != utils.ErrNotFound {
return err
} else if !at.IsZero() {
if val, err := ev.FieldAsFloat64(sum.FieldName); err != nil &&
err != utils.ErrNotFound {
return err
} else if val >= 0 {
value = val
sum.Sum += val
}
} else if val >= 0 {
value = val
sum.Sum += val
}
sum.Events[ev.TenantID()] = value
sum.Count += 1
sum.val = nil
return
}
@@ -713,7 +707,6 @@ func (sum *StatSum) RemEvent(evTenantID string) (err error) {
if val != 0 {
sum.Sum -= val
}
sum.Count -= 1
delete(sum.Events, evTenantID)
sum.val = nil
return
@@ -774,20 +767,16 @@ func (avg *StatAverage) GetFloat64Value() (v float64) {
func (avg *StatAverage) AddEvent(ev *utils.CGREvent) (err error) {
var value float64
if at, err := ev.FieldAsTime(utils.AnswerTime, config.CgrConfig().DefaultTimezone); err != nil {
if val, err := ev.FieldAsFloat64(avg.FieldName); err != nil &&
err != utils.ErrNotFound {
return err
} else if !at.IsZero() {
if val, err := ev.FieldAsFloat64(avg.FieldName); err != nil &&
err != utils.ErrNotFound {
return err
} else if val >= 0 {
value = val
avg.Sum += val
}
} else if val > 0 {
value = val
avg.Sum += val
avg.Events[ev.TenantID()] = value
avg.Count += 1
avg.val = nil
}
avg.Events[ev.TenantID()] = value
avg.Count += 1
avg.val = nil
return
}

View File

@@ -954,7 +954,7 @@ func TestStatSumGetFloat64Value(t *testing.T) {
}
ev2 := &utils.CGREvent{Tenant: "cgrates.org", ID: "EVENT_2"}
statSum.AddEvent(ev2)
if v := statSum.GetFloat64Value(); v != -1.0 {
if v := statSum.GetFloat64Value(); v != 20.0 {
t.Errorf("wrong statSum value: %v", v)
}
ev4 := &utils.CGREvent{Tenant: "cgrates.org", ID: "EVENT_4",

View File

@@ -237,10 +237,8 @@ func (sS *StatService) processEvent(ev *utils.CGREvent) (err error) {
Event: map[string]interface{}{
utils.EventType: utils.StatUpdate,
utils.StatID: sq.ID}}
for metricID, _ := range sq.SQMetrics {
for _, metric := range sq.SQMetrics[metricID] {
ev.Event[metricID] = metric.GetValue()
}
for metricID, metric := range sq.SQMetrics {
ev.Event[metricID] = metric.GetValue()
}
var hits int
if err := thresholdS.Call(utils.ThresholdSv1ProcessEvent, ev, &hits); err != nil {
@@ -281,26 +279,21 @@ func (sS *StatService) V1GetStatQueuesForEvent(ev *utils.CGREvent, reply *StatQu
// V1GetQueueStringMetrics returns the metrics of a Queue as string values
func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[string]string) (err error) {
metricsmap := make(map[string]string)
if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
utils.Logger.Debug(fmt.Sprintf("\nGETS b4 get \n"))
sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, false, "")
if err != nil {
if err != utils.ErrNotFound {
utils.Logger.Debug(fmt.Sprintf("\nGETS err not found \n"))
err = utils.NewErrServerError(err)
}
return err
}
for metricID, _ := range sq.SQMetrics {
for _, metric := range sq.SQMetrics[metricID] {
metricsmap[metricID] = metric.GetStringValue("")
utils.Logger.Debug(fmt.Sprintf("GETS HERE: %+v", metricsmap[metricID]))
}
metrics := make(map[string]string, len(sq.SQMetrics))
for metricID, metric := range sq.SQMetrics {
metrics[metricID] = metric.GetStringValue("")
}
*reply = metricsmap
*reply = metrics
return
}
@@ -317,10 +310,8 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s
return err
}
metrics := make(map[string]float64, len(sq.SQMetrics))
for metricID, _ := range sq.SQMetrics {
for _, metric := range sq.SQMetrics[metricID] {
metrics[metricID] = metric.GetFloat64Value()
}
for metricID, metric := range sq.SQMetrics {
metrics[metricID] = metric.GetFloat64Value()
}
*reply = metrics
return

View File

@@ -2219,16 +2219,14 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("StatQueues:")
}
for _, sqTntID := range tpr.statQueues {
metrics := make(map[string]map[string]StatMetric)
metrics := make(map[string]StatMetric)
for _, metricwithparam := range tpr.sqProfiles[utils.TenantID{Tenant: sqTntID.Tenant, ID: sqTntID.ID}].Metrics {
if metric, err := NewStatMetric(metricwithparam.MetricID,
tpr.sqProfiles[utils.TenantID{Tenant: sqTntID.Tenant, ID: sqTntID.ID}].MinItems, metricwithparam.Parameters); err != nil {
return err
} else {
if _, hasIt := metrics[metricwithparam.MetricID]; !hasIt {
metrics[metricwithparam.MetricID] = make(map[string]StatMetric)
}
metrics[metricwithparam.MetricID][metricwithparam.Parameters] = metric
metrics[metricwithparam.MetricID] = metric
}
}
sq := &StatQueue{Tenant: sqTntID.Tenant, ID: sqTntID.ID, SQMetrics: metrics}

View File

@@ -348,7 +348,7 @@ func (v1Sts v1Stat) AsStatQP() (filter *engine.Filter, sq *engine.StatQueue, stq
}
sq = &engine.StatQueue{Tenant: config.CgrConfig().DefaultTenant,
ID: v1Sts.Id,
SQMetrics: make(map[string]map[string]engine.StatMetric),
SQMetrics: make(map[string]engine.StatMetric),
}
if len(v1Sts.Metrics) != 0 {
for i, _ := range v1Sts.Metrics {
@@ -361,10 +361,7 @@ func (v1Sts v1Stat) AsStatQP() (filter *engine.Filter, sq *engine.StatQueue, stq
if metric, err := engine.NewStatMetric(stq.Metrics[i].MetricID, 0, ""); err != nil {
return nil, nil, nil, err
} else {
if _, has := sq.SQMetrics[stq.Metrics[i].MetricID]; !has {
sq.SQMetrics[stq.Metrics[i].MetricID] = make(map[string]engine.StatMetric)
}
sq.SQMetrics[stq.Metrics[i].MetricID][""] = metric
sq.SQMetrics[stq.Metrics[i].MetricID] = metric
}
}
}