diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go index 2d1717de9..f01354083 100644 --- a/apier/v1/stats_it_test.go +++ b/apier/v1/stats_it_test.go @@ -167,16 +167,17 @@ func TestStatSV1ProcessEvent(t *testing.T) { } } -/* func TestStatSV1StopEngine(t *testing.T) { if err := engine.KillEngine(100); err != nil { t.Error(err) } } -*/ // BenchmarkStatSV1SetEvent 5000 263437 ns/op func BenchmarkStatSV1SetEvent(b *testing.B) { + if _, err := engine.StopStartEngine(stsV1CfgPath, 1000); err != nil { + b.Fatal(err) + } b.StopTimer() var err error stsV1Rpc, err = jsonrpc.Dial("tcp", stsV1Cfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed diff --git a/cache/cache_test.go b/cache/cache_test.go index 459b434f0..7ec24eccb 100755 --- a/cache/cache_test.go +++ b/cache/cache_test.go @@ -51,7 +51,6 @@ func TestTransaction(t *testing.T) { } CommitTransaction(transID) if t1, ok := Get("mmm_t12"); !ok || t1 != "test" { - fmt.Println(t1, ok) t.Error("Error commiting transaction") } if t1, ok := Get("mmm_t11"); ok || t1 == "test" { diff --git a/engine/cdr.go b/engine/cdr.go index 0df10b469..bec027dc7 100644 --- a/engine/cdr.go +++ b/engine/cdr.go @@ -793,8 +793,38 @@ func (cdr *CDR) formatField(cfgFld *config.CfgCdrField, httpSkipTlsCheck bool, g } // Part of event interface -func (cdr *CDR) AsMapStringIface() (map[string]interface{}, error) { - return nil, utils.ErrNotImplemented +func (cdr *CDR) AsMapStringIface() (mp map[string]interface{}, err error) { + mp = make(map[string]interface{}) + for k, v := range cdr.ExtraFields { + mp[k] = v + } + mp[utils.CGRID] = cdr.CGRID + mp[utils.MEDI_RUNID] = cdr.RunID + mp[utils.ORDERID] = cdr.OrderID + mp[utils.CDRHOST] = cdr.OriginHost + mp[utils.CDRSOURCE] = cdr.Source + mp[utils.ACCID] = cdr.OriginID + mp[utils.TOR] = cdr.ToR + mp[utils.REQTYPE] = cdr.RequestType + mp[utils.DIRECTION] = cdr.Direction + mp[utils.TENANT] = cdr.Tenant + mp[utils.CATEGORY] = cdr.Category + mp[utils.ACCOUNT] = cdr.Account + mp[utils.SUBJECT] = cdr.Subject + mp[utils.DESTINATION] = cdr.Destination + mp[utils.SETUP_TIME] = cdr.SetupTime + mp[utils.PDD] = cdr.PDD + mp[utils.ANSWER_TIME] = cdr.AnswerTime + mp[utils.USAGE] = cdr.Usage + mp[utils.SUPPLIER] = cdr.Supplier + mp[utils.DISCONNECT_CAUSE] = cdr.DisconnectCause + mp[utils.CostSource] = cdr.CostSource + mp[utils.COST] = cdr.Cost + mp[utils.COST_DETAILS] = cdr.CostDetails + mp[utils.ExtraInfo] = cdr.ExtraInfo + mp[utils.RATED] = cdr.Rated + mp[utils.PartialField] = cdr.Partial + return } // Used in place where we need to export the CDR based on an export template diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go index 6df0aba22..6d800d99e 100644 --- a/engine/model_helpers_test.go +++ b/engine/model_helpers_test.go @@ -757,7 +757,7 @@ func TestAPItoResourceLimit(t *testing.T) { Filters: []*utils.TPRequestFilter{ &utils.TPRequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}}, &utils.TPRequestFilter{Type: MetaStringPrefix, FieldName: "Destination", Values: []string{"10", "20"}}, - &utils.TPRequestFilter{Type: MetaStatS, Values: []string{"CDRST1:*min_ASR:34", "CDRST_1001:*min_ASR:20"}}, + &utils.TPRequestFilter{Type: MetaStatS, Values: []string{"CDRST1:*min_asr:34", "CDRST_1001:*min_asr:20"}}, &utils.TPRequestFilter{Type: MetaRSRFields, Values: []string{"Subject(~^1.*1$)", "Destination(1002)"}}, }, ActivationInterval: &utils.TPActivationInterval{ActivationTime: "2014-07-29T15:00:00Z"}, @@ -778,10 +778,10 @@ func TestAPItoResourceLimit(t *testing.T) { eRL.Filters[1] = &RequestFilter{Type: MetaStringPrefix, FieldName: "Destination", Values: []string{"10", "20"}} eRL.Filters[2] = &RequestFilter{Type: MetaStatS, - Values: []string{"CDRST1:*min_ASR:34", "CDRST_1001:*min_ASR:20"}, + Values: []string{"CDRST1:*min_asr:34", "CDRST_1001:*min_asr:20"}, statSThresholds: []*RFStatSThreshold{ - &RFStatSThreshold{QueueID: "CDRST1", ThresholdType: "*MIN_ASR", ThresholdValue: 34}, - &RFStatSThreshold{QueueID: "CDRST_1001", ThresholdType: "*MIN_ASR", ThresholdValue: 20}, + &RFStatSThreshold{QueueID: "CDRST1", ThresholdType: "*min_asr", ThresholdValue: 34}, + &RFStatSThreshold{QueueID: "CDRST_1001", ThresholdType: "*min_asr", ThresholdValue: 20}, }} eRL.Filters[3] = &RequestFilter{Type: MetaRSRFields, Values: []string{"Subject(~^1.*1$)", "Destination(1002)"}, rsrFields: utils.ParseRSRFieldsMustCompile("Subject(~^1.*1$);Destination(1002)", utils.INFIELD_SEP), diff --git a/engine/reqfilter.go b/engine/reqfilter.go index 5fd098dad..9194e36f6 100644 --- a/engine/reqfilter.go +++ b/engine/reqfilter.go @@ -20,6 +20,7 @@ package engine import ( "errors" "fmt" + "reflect" "strconv" "strings" @@ -34,8 +35,8 @@ const ( MetaRSRFields = "*rsr_fields" MetaStatS = "*stats" MetaDestinations = "*destinations" - MetaMinCapPrefix = "*MIN_" - MetaMaxCapPrefix = "*MAX_" + MetaMinCapPrefix = "*min_" + MetaMaxCapPrefix = "*max_" ) func NewRequestFilter(rfType, fieldName string, vals []string) (*RequestFilter, error) { @@ -84,7 +85,7 @@ func (rf *RequestFilter) CompileValues() (err error) { if len(valSplt) != 3 { return fmt.Errorf("Value %s needs to contain at least 3 items", val) } - st := &RFStatSThreshold{QueueID: valSplt[0], ThresholdType: strings.ToUpper(valSplt[1])} + st := &RFStatSThreshold{QueueID: valSplt[0], ThresholdType: valSplt[1]} if len(st.ThresholdType) < len(MetaMinCapPrefix)+1 { return fmt.Errorf("Value %s contains a unsupported ThresholdType format", val) } else if !strings.HasPrefix(st.ThresholdType, MetaMinCapPrefix) && !strings.HasPrefix(st.ThresholdType, MetaMaxCapPrefix) { @@ -102,7 +103,7 @@ func (rf *RequestFilter) CompileValues() (err error) { } // Pass is the method which should be used from outside. -func (fltr *RequestFilter) Pass(req interface{}, extraFieldsLabel string, cdrStats rpcclient.RpcClientConnection) (bool, error) { +func (fltr *RequestFilter) Pass(req interface{}, extraFieldsLabel string, rpcClnt rpcclient.RpcClientConnection) (bool, error) { switch fltr.Type { case MetaString: return fltr.passString(req, extraFieldsLabel) @@ -115,7 +116,7 @@ func (fltr *RequestFilter) Pass(req interface{}, extraFieldsLabel string, cdrSta case MetaRSRFields: return fltr.passRSRFields(req, extraFieldsLabel) case MetaStatS: - return fltr.passStatS(req, extraFieldsLabel, cdrStats) + return fltr.passStatS(req, extraFieldsLabel, rpcClnt) default: return false, utils.ErrNotImplemented } @@ -194,20 +195,24 @@ func (fltr *RequestFilter) passRSRFields(req interface{}, extraFieldsLabel strin return false, nil } -func (fltr *RequestFilter) passStatS(req interface{}, extraFieldsLabel string, cdrStats rpcclient.RpcClientConnection) (bool, error) { - if cdrStats == nil { - return false, errors.New("Missing CDRStatS information") +func (fltr *RequestFilter) passStatS(req interface{}, extraFieldsLabel string, stats rpcclient.RpcClientConnection) (bool, error) { + if stats == nil || reflect.ValueOf(stats).IsNil() { + return false, errors.New("Missing StatS information") } for _, threshold := range fltr.statSThresholds { statValues := make(map[string]float64) - if err := cdrStats.Call("CDRStatsV1.GetValues", threshold.QueueID, &statValues); err != nil { + if err := stats.Call("StatSV1.GetFloatMetrics", threshold.QueueID, &statValues); err != nil { return false, err } - if val, hasIt := statValues[threshold.ThresholdType[len(MetaMinCapPrefix):]]; !hasIt { + val, hasIt := statValues[utils.MetaPrefix+threshold.ThresholdType[len(MetaMinCapPrefix):]] + if !hasIt { continue - } else if strings.HasPrefix(threshold.ThresholdType, MetaMinCapPrefix) && val >= threshold.ThresholdValue { + } + if strings.HasPrefix(threshold.ThresholdType, MetaMinCapPrefix) && + val >= threshold.ThresholdValue { return true, nil - } else if strings.HasPrefix(threshold.ThresholdType, MetaMaxCapPrefix) && val < threshold.ThresholdValue { + } else if strings.HasPrefix(threshold.ThresholdType, MetaMaxCapPrefix) && + val < threshold.ThresholdValue { return true, nil } } diff --git a/engine/reqfilter_test.go b/engine/reqfilter_test.go index 50293b83d..b980e73ef 100644 --- a/engine/reqfilter_test.go +++ b/engine/reqfilter_test.go @@ -25,7 +25,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -func TestPassString(t *testing.T) { +func TestReqFilterPassString(t *testing.T) { cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Subject: "dan", Destination: "+4986517174963", TimeStart: time.Date(2013, time.October, 7, 14, 50, 0, 0, time.UTC), TimeEnd: time.Date(2013, time.October, 7, 14, 52, 12, 0, time.UTC), DurationIndex: 132 * time.Second, ExtraFields: map[string]string{"navigation": "off"}} @@ -43,7 +43,7 @@ func TestPassString(t *testing.T) { } } -func TestPassStringPrefix(t *testing.T) { +func TestReqFilterPassStringPrefix(t *testing.T) { cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Subject: "dan", Destination: "+4986517174963", TimeStart: time.Date(2013, time.October, 7, 14, 50, 0, 0, time.UTC), TimeEnd: time.Date(2013, time.October, 7, 14, 52, 12, 0, time.UTC), DurationIndex: 132 * time.Second, ExtraFields: map[string]string{"navigation": "off"}} @@ -85,7 +85,7 @@ func TestPassStringPrefix(t *testing.T) { } } -func TestPassRSRFields(t *testing.T) { +func TestReqFilterPassRSRFields(t *testing.T) { cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Subject: "dan", Destination: "+4986517174963", TimeStart: time.Date(2013, time.October, 7, 14, 50, 0, 0, time.UTC), TimeEnd: time.Date(2013, time.October, 7, 14, 52, 12, 0, time.UTC), DurationIndex: 132 * time.Second, ExtraFields: map[string]string{"navigation": "off"}} @@ -118,7 +118,7 @@ func TestPassRSRFields(t *testing.T) { } } -func TestPassDestinations(t *testing.T) { +func TestReqFilterPassDestinations(t *testing.T) { cache.Set(utils.REVERSE_DESTINATION_PREFIX+"+49", []string{"DE", "EU_LANDLINE"}, true, "") cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Subject: "dan", Destination: "+4986517174963", TimeStart: time.Date(2013, time.October, 7, 14, 50, 0, 0, time.UTC), TimeEnd: time.Date(2013, time.October, 7, 14, 52, 12, 0, time.UTC), @@ -142,35 +142,3 @@ func TestPassDestinations(t *testing.T) { t.Error("Passing") } } - -func TestPassStatS(t *testing.T) { - /* #FixMe - cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Subject: "dan", Destination: "+4986517174963", - TimeStart: time.Date(2013, time.October, 7, 14, 50, 0, 0, time.UTC), TimeEnd: time.Date(2013, time.October, 7, 14, 52, 12, 0, time.UTC), - DurationIndex: 132 * time.Second, ExtraFields: map[string]string{"navigation": "off"}} - statS := NewStats(dataStorage, 0) - cdr := &CDR{ - Tenant: "cgrates.org", - Category: "call", - AnswerTime: time.Now(), - SetupTime: time.Now(), - Usage: 10 * time.Second, - Cost: 10, - Supplier: "suppl1", - DisconnectCause: "NORMAL_CLEARNING", - } - err := statS.AppendCDR(cdr, nil) - if err != nil { - t.Error("Error appending cdr to stats: ", err) - } - rf, err := NewRequestFilter(MetastatS, "", []string{"CDRST1:*min_asr:20", "CDRST2:*min_acd:10"}) - if err != nil { - t.Fatal(err) - } - if passes, err := rf.passStatS(cd, "ExtraFields", statS); err != nil { - t.Error(err) - } else if !passes { - t.Error("Not passing") - } - */ -} diff --git a/engine/reqfilterhelpers.go b/engine/reqfilterhelpers.go index 4c092f476..463b557ab 100644 --- a/engine/reqfilterhelpers.go +++ b/engine/reqfilterhelpers.go @@ -26,7 +26,6 @@ import ( // matchingItemIDsForEvent returns the list of item IDs matching fieldName/fieldValue for an event // helper on top of dataDB.MatchReqFilterIndex, adding utils.NOT_AVAILABLE to list of fields queried func matchingItemIDsForEvent(ev map[string]interface{}, dataDB DataDB, dbIdxKey string) (itemIDs utils.StringMap, err error) { - fmt.Printf("Event: %+v, dbIdxKey: %s\n", ev, dbIdxKey) itemIDs = make(utils.StringMap) for fldName, fieldValIf := range ev { fldVal, canCast := utils.CastFieldIfToString(fieldValIf) diff --git a/stats/service.go b/stats/service.go index 0cb616157..42f5f8406 100644 --- a/stats/service.go +++ b/stats/service.go @@ -21,11 +21,14 @@ import ( "errors" "fmt" "math/rand" + "reflect" + "strings" "sync" "time" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) func init() { @@ -259,3 +262,29 @@ func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err err *reply = utils.OK return } + +// Call implements rpcclient.RpcClientConnection interface for internal RPC +// here for testing purposes +func (ss *StatService) Call(serviceMethod string, args interface{}, reply interface{}) error { + methodSplit := strings.Split(serviceMethod, ".") + if len(methodSplit) != 2 { + return rpcclient.ErrUnsupporteServiceMethod + } + method := reflect.ValueOf(ss).MethodByName(methodSplit[0][len(methodSplit[0])-2:] + methodSplit[1]) + if !method.IsValid() { + return rpcclient.ErrUnsupporteServiceMethod + } + params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)} + ret := method.Call(params) + if len(ret) != 1 { + return utils.ErrServerError + } + if ret[0].Interface() == nil { + return nil + } + err, ok := ret[0].Interface().(error) + if !ok { + return utils.ErrServerError + } + return err +} diff --git a/stats/service_test.go b/stats/service_test.go new file mode 100644 index 000000000..940f752a9 --- /dev/null +++ b/stats/service_test.go @@ -0,0 +1,77 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package stats + +import ( + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func TestReqFilterPassStatS(t *testing.T) { + if cgrCfg := config.CgrConfig(); cgrCfg == nil { + cgrCfg, _ = config.NewDefaultCGRConfig() + config.SetCgrConfig(cgrCfg) + } + dataStorage, _ := engine.NewMapStorage() + dataStorage.SetStatsQueue( + &engine.StatsQueue{ID: "CDRST1", + Filters: []*engine.RequestFilter{ + &engine.RequestFilter{Type: engine.MetaString, FieldName: "Tenant", + Values: []string{"cgrates.org"}}}, + Metrics: []string{utils.MetaASR}}) + statS, err := NewStatService(dataStorage, dataStorage.Marshaler(), 0) + if err != nil { + t.Fatal(err) + } + var replyStr string + if err := statS.Call("StatSV1.LoadQueues", ArgsLoadQueues{}, + &replyStr); err != nil { + t.Error(err) + } else if replyStr != utils.OK { + t.Errorf("reply received: %s", replyStr) + } + cdr := &engine.CDR{ + Tenant: "cgrates.org", + Category: "call", + AnswerTime: time.Now(), + SetupTime: time.Now(), + Usage: 10 * time.Second, + Cost: 10, + Supplier: "suppl1", + DisconnectCause: "NORMAL_CLEARNING", + } + cdrMp, _ := cdr.AsMapStringIface() + cdrMp[utils.ID] = "event1" + if err := statS.processEvent(cdrMp); err != nil { + t.Error(err) + } + rf, err := engine.NewRequestFilter(engine.MetaStatS, "", + []string{"CDRST1:*min_asr:20"}) + if err != nil { + t.Fatal(err) + } + if passes, err := rf.Pass(cdr, "", statS); err != nil { + t.Error(err) + } else if !passes { + t.Error("Not passing") + } +} diff --git a/utils/consts.go b/utils/consts.go index 6a8d19e26..ca96c5175 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -418,6 +418,9 @@ const ( CacheResourceLimits = "resource_limits" CacheTimings = "timings" StatS = "stats" + CostSource = "CostSource" + ExtraInfo = "ExtraInfo" + MetaPrefix = "*" ) func buildCacheInstRevPrefixes() {