diff --git a/apier/v1/thresholds.go b/apier/v1/thresholds.go index dc00d2813..7f31fb5ca 100644 --- a/apier/v1/thresholds.go +++ b/apier/v1/thresholds.go @@ -48,6 +48,11 @@ func (tSv1 *ThresholdSV1) GetThresholdsForEvent(ev *engine.ThresholdEvent, reply return tSv1.tS.V1GetThresholdsForEvent(ev, reply) } +// GetThreshold queries a Threshold +func (tSv1 *ThresholdSV1) GetThreshold(tntID *utils.TenantID, t *engine.Threshold) error { + return tSv1.tS.V1GetThreshold(tntID, t) +} + // ProcessEvent will process an Event func (tSv1 *ThresholdSV1) ProcessEvent(ev *engine.ThresholdEvent, reply *string) error { return tSv1.tS.V1ProcessEvent(ev, reply) diff --git a/apier/v1/thresholds_it_test.go b/apier/v1/thresholds_it_test.go new file mode 100644 index 000000000..56e78dbe2 --- /dev/null +++ b/apier/v1/thresholds_it_test.go @@ -0,0 +1,365 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package v1 + +import ( + "net/rpc" + "net/rpc/jsonrpc" + "path" + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + tSv1CfgPath string + tSv1Cfg *config.CGRConfig + tSv1Rpc *rpc.Client + tPrfl *engine.ThresholdProfile + tSv1ConfDIR string //run tests for specific configuration + thdsDelay int +) + +var tEvs = []*engine.ThresholdEvent{ + &engine.ThresholdEvent{ + Tenant: "cgrates.org", + ID: "event1", + Fields: map[string]interface{}{ + utils.EventSource: utils.StatService, + utils.StatID: "Stats1", + utils.MetaASR: 35.0, + utils.MetaACD: time.Duration(2*time.Minute + 45*time.Second), + utils.MetaTCC: 12.7, + utils.MetaTCD: time.Duration(12*time.Minute + 15*time.Second), + utils.MetaACC: 0.75, + utils.MetaPDD: time.Duration(2 * time.Second), + }}, + &engine.ThresholdEvent{ + Tenant: "cgrates.org", + ID: "event2", + Fields: map[string]interface{}{ + utils.EventSource: utils.AccountService, + utils.AccountID: "1002", + utils.BalanceType: utils.MONETARY, + utils.BalanceID: utils.META_DEFAULT, + utils.BalanceValue: 12.3}}, + &engine.ThresholdEvent{ + Tenant: "cgrates.org", + ID: "event3", + Fields: map[string]interface{}{ + utils.EventSource: utils.ResourceS, + utils.ResourceID: "ResGroup1", + utils.USAGE: 10.0}}, +} + +var sTestsThresholdSV1 = []func(t *testing.T){ + testV1TSLoadConfig, + testV1TSInitDataDb, + testV1TSStartEngine, + testV1TSRpcConn, + testV1TSFromFolder, + testV1TSGetThresholds, + //testV1STSProcessEvent, + //testV1TSGetThresholdsAfterRestart, + //testV1STSSetThresholdProfile, + //testV1STSUpdateThresholdProfile, + //testV1STSRemoveThresholdProfile, + testV1TSStopEngine, +} + +// Test start here +func TestTSV1ITMySQL(t *testing.T) { + tSv1ConfDIR = "tutmysql" + for _, stest := range sTestsThresholdSV1 { + t.Run(tSv1ConfDIR, stest) + } +} + +func TestTSV1ITMongo(t *testing.T) { + tSv1ConfDIR = "tutmongo" + time.Sleep(time.Duration(5 * time.Second)) // give time for engine to start + for _, stest := range sTestsThresholdSV1 { + t.Run(tSv1ConfDIR, stest) + } +} + +func testV1TSLoadConfig(t *testing.T) { + var err error + tSv1CfgPath = path.Join(*dataDir, "conf", "samples", tSv1ConfDIR) + if tSv1Cfg, err = config.NewCGRConfigFromFolder(tSv1CfgPath); err != nil { + t.Error(err) + } + switch tSv1ConfDIR { + case "tutmongo": // Mongo needs more time to reset db, need to investigate + thdsDelay = 4000 + default: + thdsDelay = 1000 + } +} + +func testV1TSInitDataDb(t *testing.T) { + if err := engine.InitDataDb(tSv1Cfg); err != nil { + t.Fatal(err) + } +} + +func testV1TSStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(tSv1CfgPath, thdsDelay); err != nil { + t.Fatal(err) + } +} + +func testV1TSRpcConn(t *testing.T) { + var err error + tSv1Rpc, err = jsonrpc.Dial("tcp", tSv1Cfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +func testV1TSFromFolder(t *testing.T) { + var reply string + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")} + if err := tSv1Rpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil { + t.Error(err) + } +} + +func testV1TSGetThresholds(t *testing.T) { + var tIDs []string + expectedIDs := []string{"Threshold1"} + if err := tSv1Rpc.Call("ThresholdSV1.GetThresholdIDs", "cgrates.org", &tIDs); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expectedIDs, tIDs) { + t.Errorf("expecting: %+v, received reply: %s", expectedIDs, tIDs) + } + var td engine.Threshold + eTd := engine.Threshold{Tenant: "cgrates.org", ID: expectedIDs[0]} + if err := tSv1Rpc.Call("ThresholdSV1.GetThreshold", + &utils.TenantID{Tenant: "cgrates.org", ID: expectedIDs[0]}, &td); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eTd, td) { + t.Errorf("expecting: %+v, received: %+v", eTd, td) + } +} + +/* + +func testV1STSProcessEvent(t *testing.T) { + var reply string + ev1 := engine.StatEvent{ + Tenant: "cgrates.org", + ID: "event1", + Fields: map[string]interface{}{ + utils.ACCOUNT: "1001", + utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + utils.USAGE: time.Duration(135 * time.Second), + utils.COST: 123.0, + utils.PDD: time.Duration(12 * time.Second)}} + if err := tSv1Rpc.Call("StatSV1.ProcessEvent", &ev1, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("received reply: %s", reply) + } + //process with one event (should be N/A becaus MinItems is 2) + expectedMetrics := map[string]string{ + utils.MetaASR: utils.NOT_AVAILABLE, + utils.MetaACD: utils.NOT_AVAILABLE, + utils.MetaTCC: utils.NOT_AVAILABLE, + utils.MetaTCD: utils.NOT_AVAILABLE, + utils.MetaACC: utils.NOT_AVAILABLE, + utils.MetaPDD: utils.NOT_AVAILABLE, + } + var metrics map[string]string + if err := tSv1Rpc.Call("StatSV1.GetQueueStringMetrics", &utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}, &metrics); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expectedMetrics, metrics) { + t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics) + } + ev2 := engine.StatEvent{ + Tenant: "cgrates.org", + ID: "event2", + Fields: map[string]interface{}{ + utils.ACCOUNT: "1002", + utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + utils.USAGE: time.Duration(45 * time.Second)}} + if err := tSv1Rpc.Call("StatSV1.ProcessEvent", &ev2, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("received reply: %s", reply) + } + ev3 := &engine.StatEvent{ + Tenant: "cgrates.org", + ID: "event3", + Fields: map[string]interface{}{ + utils.ACCOUNT: "1002", + utils.SETUP_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + utils.USAGE: 0}} + if err := tSv1Rpc.Call("StatSV1.ProcessEvent", &ev3, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("received reply: %s", reply) + } + expectedMetrics2 := map[string]string{ + utils.MetaASR: "66.66667%", + utils.MetaACD: "1m30s", + utils.MetaACC: "61.5", + utils.MetaTCD: "3m0s", + utils.MetaTCC: "123", + utils.MetaPDD: "4s", + } + var metrics2 map[string]string + if err := tSv1Rpc.Call("StatSV1.GetQueueStringMetrics", &utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}, &metrics2); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expectedMetrics2, metrics2) { + t.Errorf("expecting: %+v, received reply: %s", expectedMetrics2, metrics2) + } +} + +func testV1TSGetThresholdsAfterRestart(t *testing.T) { + time.Sleep(time.Second) + if _, err := engine.StopStartEngine(tSv1CfgPath, thdsDelay); err != nil { + t.Fatal(err) + } + var err error + tSv1Rpc, err = jsonrpc.Dial("tcp", tSv1Cfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } + //get stats metrics after restart + expectedMetrics2 := map[string]string{ + utils.MetaASR: "66.66667%", + utils.MetaACD: "1m30s", + utils.MetaACC: "61.5", + utils.MetaTCD: "3m0s", + utils.MetaTCC: "123", + utils.MetaPDD: "4s", + } + var metrics2 map[string]string + if err := tSv1Rpc.Call("StatSV1.GetQueueStringMetrics", &utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}, &metrics2); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expectedMetrics2, metrics2) { + t.Errorf("After restat expecting: %+v, received reply: %s", expectedMetrics2, metrics2) + } + time.Sleep(time.Duration(1 * time.Second)) +} + +func testV1STSSetThresholdProfile(t *testing.T) { + var reply *engine.ThresholdProfile + if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + tPrfl = &engine.ThresholdProfile{ + Tenant: "cgrates.org", + ID: "TEST_PROFILE1", + Filters: []*engine.RequestFilter{ + &engine.RequestFilter{ + Type: "type", + FieldName: "Name", + Values: []string{"FilterValue1", "FilterValue2"}, + }, + }, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + }, + QueueLength: 10, + TTL: time.Duration(10) * time.Second, + Metrics: []string{"MetricValue", "MetricValueTwo"}, + Thresholds: []string{"Val1", "Val2"}, + Blocker: true, + Stored: true, + Weight: 20, + MinItems: 1, + } + var result string + if err := tSv1Rpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tPrfl, reply) { + t.Errorf("Expecting: %+v, received: %+v", tPrfl, reply) + } +} + +func testV1STSUpdateThresholdProfile(t *testing.T) { + var result string + tPrfl.Filters = []*engine.RequestFilter{ + &engine.RequestFilter{ + Type: "type", + FieldName: "Name", + Values: []string{"FilterValue1", "FilterValue2"}, + }, + &engine.RequestFilter{ + Type: "*string", + FieldName: "Accout", + Values: []string{"1001", "1002"}, + }, + &engine.RequestFilter{ + Type: "*string_prefix", + FieldName: "Destination", + Values: []string{"10", "20"}, + }, + } + if err := tSv1Rpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + time.Sleep(time.Duration(1 * time.Second)) + var reply *engine.ThresholdProfile + if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tPrfl, reply) { + t.Errorf("Expecting: %+v, received: %+v", tPrfl, reply) + } +} + +func testV1STSRemoveThresholdProfile(t *testing.T) { + var resp string + if err := tSv1Rpc.Call("ApierV1.RemThresholdProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &resp); err != nil { + t.Error(err) + } else if resp != utils.OK { + t.Error("Unexpected reply returned", resp) + } + var sqp *engine.ThresholdProfile + if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &sqp); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } +} +*/ +func testV1TSStopEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +} diff --git a/engine/storage_map.go b/engine/storage_map.go index f6a5ba3be..135ea6332 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -317,7 +317,7 @@ func (ms *MapStorage) HasData(categ, subject string) (bool, error) { switch categ { case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, - utils.ResourcesPrefix, utils.StatQueuePrefix, utils.ThresholdProfilePrefix: + utils.ResourcesPrefix, utils.StatQueuePrefix, utils.ThresholdPrefix: _, exists := ms.dict[categ+subject] return exists, nil } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index ca677a060..c1c2c2709 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -739,7 +739,7 @@ func (ms *MongoStorage) HasData(category, subject string) (has bool, err error) case utils.StatQueuePrefix: count, err = db.C(colRes).Find(bson.M{"id": subject}).Count() has = count > 0 - case utils.ThresholdProfilePrefix: + case utils.ThresholdPrefix: count, err = db.C(colTps).Find(bson.M{"id": subject}).Count() has = count > 0 case utils.FilterPrefix: diff --git a/engine/storage_redis.go b/engine/storage_redis.go index b49d76d15..ca207d3da 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -335,7 +335,7 @@ func (rs *RedisStorage) HasData(category, subject string) (bool, error) { switch category { case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, - utils.ResourcesPrefix, utils.StatQueuePrefix, utils.ThresholdProfilePrefix, utils.FilterPrefix: + utils.ResourcesPrefix, utils.StatQueuePrefix, utils.ThresholdPrefix, utils.FilterPrefix: i, err := rs.Cmd("EXISTS", category+subject).Int() return i == 1, err } diff --git a/engine/thresholds.go b/engine/thresholds.go index e727b953f..da0064f6d 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -371,3 +371,13 @@ func (tS *ThresholdService) V1GetThresholdIDs(tenant string, tIDs *[]string) (er *tIDs = retIDs return } + +// V1GetThreshold retrieves a Threshold +func (tS *ThresholdService) V1GetThreshold(tntID *utils.TenantID, t *Threshold) (err error) { + if thd, err := tS.dm.DataDB().GetThreshold(tntID.Tenant, tntID.ID, false, ""); err != nil { + return err + } else { + *t = *thd + } + return +} diff --git a/engine/tp_reader.go b/engine/tp_reader.go index bcff0b566..cbac38bd7 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -1675,7 +1675,7 @@ func (tpr *TpReader) LoadThresholdsFiltered(tag string) error { for tenant, mpID := range mapTHs { for thID := range mpID { thTntID := &utils.TenantID{Tenant: tenant, ID: thID} - if has, err := tpr.dataStorage.HasData(utils.ThresholdProfilePrefix, thTntID.TenantID()); err != nil { + if has, err := tpr.dataStorage.HasData(utils.ThresholdPrefix, thTntID.TenantID()); err != nil { return err } else if !has { tpr.thresholds = append(tpr.thresholds, thTntID) @@ -1708,7 +1708,7 @@ func (tpr *TpReader) LoadFilterFiltered(tag string) error { if has, err := tpr.dataStorage.HasData(utils.FilterPrefix, thTntID.TenantID()); err != nil { return err } else if !has { - tpr.thresholds = append(tpr.filters, thTntID) + tpr.filters = append(tpr.filters, thTntID) } } } @@ -2078,7 +2078,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err } } if verbose { - log.Print("Thresholds:") + log.Print("ThresholdProfiles:") } for _, mpID := range tpr.thProfiles { for _, tpTH := range mpID { @@ -2094,6 +2094,17 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err } } } + if verbose { + log.Print("Thresholds:") + } + for _, thd := range tpr.thresholds { + if err = tpr.dataStorage.SetThreshold(&Threshold{Tenant: thd.Tenant, ID: thd.ID}); err != nil { + return err + } + if verbose { + log.Print("\t", thd.TenantID()) + } + } if verbose { log.Print("Filters:") } diff --git a/utils/consts.go b/utils/consts.go index 3d42924a0..17f7f1317 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -465,6 +465,11 @@ const ( AccountID = "AccountID" ResourceID = "ResourceID" TotalUsage = "TotalUsage" + StatID = "StatID" + BalanceType = "BalanceType" + BalanceID = "BalanceID" + BalanceValue = "BalanceValue" + ResourceS = "ResourceS" ) func buildCacheInstRevPrefixes() {