From b98418e305f01ff52618b58e922f8875a57d2270 Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 24 Aug 2017 07:53:29 -0400 Subject: [PATCH 1/5] Add TPStats and test for it --- apier/v1/tpstats.go | 85 ++++++++++++++++ apier/v1/tpstats_it_test.go | 175 +++++++++++++++++++++++++++++++++ engine/model_helpers.go | 7 +- engine/model_helpers_test.go | 29 ++++++ engine/storage_mongo_datadb.go | 2 +- engine/storage_mongo_stordb.go | 33 ++++++- engine/storage_sql.go | 10 +- 7 files changed, 334 insertions(+), 7 deletions(-) create mode 100644 apier/v1/tpstats.go create mode 100644 apier/v1/tpstats_it_test.go diff --git a/apier/v1/tpstats.go b/apier/v1/tpstats.go new file mode 100644 index 000000000..cb2e507b2 --- /dev/null +++ b/apier/v1/tpstats.go @@ -0,0 +1,85 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package v1 + +import ( + "github.com/cgrates/cgrates/utils" +) + +func (self *ApierV1) SetTPStat(attr utils.TPStats, reply *string) error { + if missing := utils.MissingStructFields(&attr, []string{"TPid", "ID"}); len(missing) != 0 { + return utils.NewErrMandatoryIeMissing(missing...) + } + if err := self.StorDb.SetTPStats([]*utils.TPStats{&attr}); err != nil { + return utils.APIErrorHandler(err) + } + *reply = utils.OK + return nil +} + +type AttrGetTPStat struct { + TPid string // Tariff plan id + ID string +} + +func (self *ApierV1) GetTPStat(attr AttrGetTPStat, reply *utils.TPStats) error { + if missing := utils.MissingStructFields(&attr, []string{"TPid", "ID"}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } + if rls, err := self.StorDb.GetTPStats(attr.TPid, attr.ID); err != nil { + if err.Error() != utils.ErrNotFound.Error() { + err = utils.NewErrServerError(err) + } + return err + } else { + *reply = *rls[0] + } + return nil +} + +type AttrGetTPStatIds struct { + TPid string // Tariff plan id + utils.Paginator +} + +func (self *ApierV1) GetTPStatIDs(attrs AttrGetTPStatIds, reply *[]string) error { + if missing := utils.MissingStructFields(&attrs, []string{"TPid"}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } + if ids, err := self.StorDb.GetTpTableIds(attrs.TPid, utils.TBLTPStats, utils.TPDistinctIds{"tag"}, nil, &attrs.Paginator); err != nil { + return utils.NewErrServerError(err) + } else if ids == nil { + return utils.ErrNotFound + } else { + *reply = ids + } + return nil +} + +func (self *ApierV1) RemTPStat(attrs AttrGetTPStat, reply *string) error { + if missing := utils.MissingStructFields(&attrs, []string{"TPid", "ID"}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } + if err := self.StorDb.RemTpData(utils.TBLTPStats, attrs.TPid, map[string]string{"tag": attrs.ID}); err != nil { + return utils.NewErrServerError(err) + } else { + *reply = utils.OK + } + return nil + +} diff --git a/apier/v1/tpstats_it_test.go b/apier/v1/tpstats_it_test.go new file mode 100644 index 000000000..b8e840a62 --- /dev/null +++ b/apier/v1/tpstats_it_test.go @@ -0,0 +1,175 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package v1 + +import ( + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "net/rpc" + "net/rpc/jsonrpc" + "path" + "reflect" + "testing" +) + +var tpCfgPath string +var tpCfg *config.CGRConfig +var tpRPC *rpc.Client + +func TestTPStatInitCfg(t *testing.T) { + var err error + tpCfgPath = path.Join(*dataDir, "conf", "samples", "tutmysql") + tpCfg, err = config.NewCGRConfigFromFolder(tpCfgPath) + if err != nil { + t.Error(err) + } + tpCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush() + config.SetCgrConfig(tpCfg) +} + +// Wipe out the cdr database +func TestTPStatResetStorDb(t *testing.T) { + if err := engine.InitStorDb(tpCfg); err != nil { + t.Fatal(err) + } +} + +// Start CGR Engine + +func TestTPStatStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(tpCfgPath, 1000); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func TestTPStatRpcConn(t *testing.T) { + var err error + tpRPC, err = jsonrpc.Dial("tcp", tpCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } +} + +var tpStat = &utils.TPStats{ + TPid: "TPS1", + ID: "Stat1", + Filters: []*utils.TPRequestFilter{ + &utils.TPRequestFilter{ + Type: "*string", + FieldName: "Account", + Values: []string{"1001", "1002"}, + }, + &utils.TPRequestFilter{ + Type: "*string_prefix", + FieldName: "Destination", + Values: []string{"10", "20"}, + }, + }, + ActivationInterval: &utils.TPActivationInterval{ + ActivationTime: "2014-07-29T15:00:00Z", + ExpiryTime: "", + }, + TTL: "1", + Metrics: []string{"MetricValue", "MetricValueTwo"}, + Blocker: true, + Stored: true, + Weight: 20, + Thresholds: nil, +} + +func TestTPStatGetTPStatIDs(t *testing.T) { + var reply []string + if err := tpRPC.Call("ApierV1.GetTPStatIDs", AttrGetTPStatIds{TPid: "TPS1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } +} + +func TestTPStatSetTPStat(t *testing.T) { + var result string + if err := tpRPC.Call("ApierV1.SetTPStat", tpStat, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } +} + +func TestTPStatGetTPStat(t *testing.T) { + var respond *utils.TPStats + if err := tpRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &respond); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tpStat, respond) { + t.Errorf("Expecting: %+v, received: %+v", tpStat.TPid, respond.TPid) + } +} + +func TestTPStatUpdateTPStat(t *testing.T) { + var result string + tpStat.Weight = 21 + tpStat.Filters = []*utils.TPRequestFilter{ + &utils.TPRequestFilter{ + Type: "*string", + FieldName: "Account", + Values: []string{"1001", "1002"}, + }, + &utils.TPRequestFilter{ + Type: "*string_prefix", + FieldName: "Destination", + Values: []string{"10", "20"}, + }, + &utils.TPRequestFilter{ + Type: "*rsr_fields", + FieldName: "", + Values: []string{"Subject(~^1.*1$)", "Destination(1002)"}, + }, + } + if err := tpRPC.Call("ApierV1.SetTPStat", tpStat, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + var expectedTPS *utils.TPStats + if err := tpRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &expectedTPS); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tpStat, expectedTPS) { + t.Errorf("Expecting: %+v, received: %+v", tpStat, expectedTPS) + } +} + +func TestTPStatRemTPStat(t *testing.T) { + var resp string + if err := tpRPC.Call("ApierV1.RemTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &resp); err != nil { + t.Error(err) + } else if resp != utils.OK { + t.Error("Unexpected reply returned", resp) + } +} + +func TestTPStatCheckDelete(t *testing.T) { + var respond *utils.TPStats + if err := tpRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: "TPS1", ID: "Stat1"}, &respond); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } +} + +func TestTPStatKillEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +} diff --git a/engine/model_helpers.go b/engine/model_helpers.go index 61ea3bea6..7137c9e96 100755 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -2024,8 +2024,11 @@ func APItoModelStats(st *utils.TPStats) (mdls TpStatsS) { mdl.Stored = st.Stored mdl.Weight = st.Weight mdl.QueueLength = st.QueueLength - for _, val := range st.Metrics { - mdl.Metrics = mdl.Metrics + utils.INFIELD_SEP + val + for i, val := range st.Metrics { + if i != 0 { + mdl.Metrics += utils.INFIELD_SEP + } + mdl.Metrics += val } for _, val := range st.Thresholds { mdl.Thresholds = mdl.Thresholds + utils.INFIELD_SEP + val diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go index 64d7589d6..e174dee9e 100755 --- a/engine/model_helpers_test.go +++ b/engine/model_helpers_test.go @@ -15,6 +15,7 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ + package engine import ( @@ -188,6 +189,34 @@ func TestApierTPTimingAsExportSlice(t *testing.T) { } } +func TestAPItoModelStats(t *testing.T) { + tpS := &utils.TPStats{ + TPid: "TPS1", + ID: "Stat1", + Filters: []*utils.TPRequestFilter{ + &utils.TPRequestFilter{ + Type: "*string", + FieldName: "Account", + Values: []string{"1002"}, + }, + }, + ActivationInterval: &utils.TPActivationInterval{ + ActivationTime: "2014-07-29T15:00:00Z", + ExpiryTime: "", + }, + TTL: "1", + Metrics: []string{"MetricValue"}, + Blocker: true, + Stored: true, + Weight: 20, + Thresholds: nil, + } + expectedtpS := APItoModelStats(tpS) + if !reflect.DeepEqual(expectedtpS, tpS) { + t.Errorf("Expecting: %+v, received: %+v", expectedtpS, tpS) + } +} + func TestTPRatingPlanAsExportSlice(t *testing.T) { tpRpln := &utils.TPRatingPlan{ TPid: "TEST_TPID", diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 45846614b..50574523a 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -180,7 +180,7 @@ func (ms *MongoStorage) EnsureIndexes() (err error) { Sparse: false, } for _, col := range []string{utils.TBLTPTimings, utils.TBLTPDestinations, utils.TBLTPDestinationRates, utils.TBLTPRatingPlans, - utils.TBLTPSharedGroups, utils.TBLTPCdrStats, utils.TBLTPActions, utils.TBLTPActionPlans, utils.TBLTPActionTriggers} { + utils.TBLTPSharedGroups, utils.TBLTPCdrStats, utils.TBLTPActions, utils.TBLTPActionPlans, utils.TBLTPActionTriggers, utils.TBLTPStats} { if err = db.C(col).EnsureIndex(idx); err != nil { return } diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go index bfc9658d0..8bdf92996 100755 --- a/engine/storage_mongo_stordb.go +++ b/engine/storage_mongo_stordb.go @@ -379,6 +379,23 @@ func (ms *MongoStorage) GetTPResourceLimits(tpid, id string) ([]*utils.TPResourc return results, err } +func (ms *MongoStorage) GetTPStats(tpid, id string) ([]*utils.TPStats, error) { + filter := bson.M{ + "tpid": tpid, + } + if id != "" { + filter["id"] = id + } + var results []*utils.TPStats + session, col := ms.conn(utils.TBLTPStats) + defer session.Close() + err := col.Find(filter).All(&results) + if len(results) == 0 { + return results, utils.ErrNotFound + } + return results, err +} + func (ms *MongoStorage) GetTPDerivedChargers(tp *utils.TPDerivedChargers) ([]*utils.TPDerivedChargers, error) { filter := bson.M{"tpid": tp.TPid} if tp.Direction != "" { @@ -841,6 +858,20 @@ func (ms *MongoStorage) SetTPResourceLimits(tpRLs []*utils.TPResourceLimit) (err return } +func (ms *MongoStorage) SetTPRStats(tpS []*utils.TPStats) (err error) { + if len(tpS) == 0 { + return + } + session, col := ms.conn(utils.TBLTPStats) + defer session.Close() + tx := col.Bulk() + for _, tp := range tpS { + tx.Upsert(bson.M{"tpid": tp.TPid, "id": tp.ID}, tp) + } + _, err = tx.Run() + return +} + func (ms *MongoStorage) SetSMCost(smc *SMCost) error { if smc.CostDetails == nil { return nil @@ -1105,7 +1136,7 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, return cdrs, 0, nil } -func (ms *MongoStorage) GetTPStats(tpid, id string) ([]*utils.TPStats, error) { +func (ms *MongoStorage) GetTPStat(tpid, id string) ([]*utils.TPStats, error) { filter := bson.M{ "tpid": tpid, } diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 47a31a029..ec8f8d9f0 100755 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -190,9 +190,7 @@ func (self *SQLStorage) RemTpData(table, tpid string, args map[string]string) er if len(table) == 0 { // Remove tpid out of all tables for _, tblName := range []string{utils.TBLTPTimings, utils.TBLTPDestinations, utils.TBLTPRates, utils.TBLTPDestinationRates, utils.TBLTPRatingPlans, utils.TBLTPRateProfiles, utils.TBLTPSharedGroups, utils.TBLTPCdrStats, utils.TBLTPLcrs, utils.TBLTPActions, utils.TBLTPActionPlans, utils.TBLTPActionTriggers, utils.TBLTPAccountActions, - utils.TBLTPDerivedChargers, utils.TBLTPAliases, utils.TBLTPUsers, utils.TBLTPResourceLimits, - // utils.TBLTPStats - } { + utils.TBLTPDerivedChargers, utils.TBLTPAliases, utils.TBLTPUsers, utils.TBLTPResourceLimits, utils.TBLTPStats} { if err := tx.Table(tblName).Where("tpid = ?", tpid).Delete(nil).Error; err != nil { tx.Rollback() return err @@ -201,6 +199,7 @@ func (self *SQLStorage) RemTpData(table, tpid string, args map[string]string) er tx.Commit() return nil } + utils.Logger.Debug(fmt.Sprintf("#Rem sterge %s", tpid)) // Remove from a single table tx = tx.Table(table).Where("tpid = ?", tpid) // Compose filters @@ -219,8 +218,10 @@ func (self *SQLStorage) SetTPTimings(timings []*utils.ApierTPTiming) error { if len(timings) == 0 { return nil } + tx := self.db.Begin() for _, timing := range timings { + utils.Logger.Debug(fmt.Sprintf("#1(set) Id care trimite %s", timing.ID)) if err := tx.Where(&TpTiming{Tpid: timing.TPid, Tag: timing.ID}).Delete(TpTiming{}).Error; err != nil { tx.Rollback() return err @@ -1170,6 +1171,8 @@ func (self *SQLStorage) GetTPDestinationRates(tpid, id string, pagination *utils func (self *SQLStorage) GetTPTimings(tpid, id string) ([]*utils.ApierTPTiming, error) { var tpTimings TpTimings q := self.db.Where("tpid = ?", tpid) + utils.Logger.Debug(fmt.Sprintf("#1 Id care trimite %s", id)) + utils.Logger.Debug(fmt.Sprintf("#1 TPId care trimite %s", tpid)) if len(id) != 0 { q = q.Where("tag = ?", id) } @@ -1177,6 +1180,7 @@ func (self *SQLStorage) GetTPTimings(tpid, id string) ([]*utils.ApierTPTiming, e return nil, err } ts := tpTimings.AsTPTimings() + utils.Logger.Debug(fmt.Sprintf("#2 ce gaseste : %s", ts)) if len(ts) == 0 { return ts, utils.ErrNotFound } From 50fdbe39cd70e10bba854863a4879eca4ac6fa95 Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 24 Aug 2017 10:31:41 -0400 Subject: [PATCH 2/5] Add loader_it_test.go --- apier/v1/Loader_it_test.go | 97 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 apier/v1/Loader_it_test.go diff --git a/apier/v1/Loader_it_test.go b/apier/v1/Loader_it_test.go new file mode 100644 index 000000000..0d053dc5c --- /dev/null +++ b/apier/v1/Loader_it_test.go @@ -0,0 +1,97 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package v1 + +import ( + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "net/rpc" + "net/rpc/jsonrpc" + "path" + "testing" +) + +var loaderCfgPath string +var loaderCfg *config.CGRConfig +var loaderRPC *rpc.Client +var loaderDataDir = "/usr/share/cgrates" + +func TestLoaderInitCfg(t *testing.T) { + var err error + loaderCfgPath = path.Join(loaderDataDir, "conf", "samples", "tutmysql") + loaderCfg, err = config.NewCGRConfigFromFolder(loaderCfgPath) + if err != nil { + t.Error(err) + } + loaderCfg.DataFolderPath = loaderDataDir // Share DataFolderPath through config towards StoreDb for Flush() + config.SetCgrConfig(loaderCfg) +} + +func TestLoaderInitDataDb(t *testing.T) { + if err := engine.InitDataDb(loaderCfg); err != nil { + t.Fatal(err) + } +} + +// Wipe out the cdr database +func TestLoaderStorDb(t *testing.T) { + if err := engine.InitStorDb(loaderCfg); err != nil { + t.Fatal(err) + } +} + +// Start CGR Engine +func TestLoaderStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(loaderCfgPath, 1000); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func TestLoaderRpcConn(t *testing.T) { + var err error + loaderRPC, err = jsonrpc.Dial("tcp", loaderCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } +} + +func TestLoaderImportTPFromFolderPath(t *testing.T) { + var reply string + if err := loaderRPC.Call("ApierV1.ImportTariffPlanFromFolder", utils.AttrImportTPFromFolder{TPid: "TEST_LOADER", FolderPath: path.Join(loaderDataDir, "tariffplans", "tutorial")}, &reply); err != nil { + t.Error("Got error on ApierV1.ImportTarrifPlanFromFolder: ", err.Error()) + } else if reply != utils.OK { + t.Error("Calling ApierV1.ImportTarrifPlanFromFolder got reply: ", reply) + } +} + +func TestLoaderLoadTariffPlanFromStorDbDryRun(t *testing.T) { + var reply string + if err := loaderRPC.Call("ApierV1.LoadTariffPlanFromStorDb", AttrLoadTpFromStorDb{TPid: "TEST_LOADER", DryRun: true}, &reply); err != nil { + t.Error("Got error on ApierV1.LoadTariffPlanFromStorDb: ", err.Error()) + } else if reply != utils.OK { + t.Error("Calling ApierV1.LoadTariffPlanFromStorDb got reply: ", reply) + } +} + +func TestLoaderKillEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +} From 7833f8ed3630cdaa693b7520cb0df4888038e3ec Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 24 Aug 2017 12:20:06 -0400 Subject: [PATCH 3/5] Small fix for TPStats --- .../v1/{Loader_it_test.go => loader_it_test.go} | 0 apier/v1/resourcesv1.go | 13 +++++++++++++ apier/v1/stats.go | 13 +++++++++++++ apier/v1/tpstats_it_test.go | 5 +++-- engine/{statsqueue.go => libstats.go} | 2 +- engine/model_helpers.go | 4 ++-- engine/model_helpers_test.go | 17 +++++++++++++++-- engine/storage_interface.go | 7 ++++--- engine/storage_map.go | 12 ++++++------ engine/storage_mongo_datadb.go | 14 +++++++------- engine/storage_redis.go | 12 ++++++------ engine/tp_reader.go | 2 +- stats/queue.go | 4 ++-- stats/queue_test.go | 16 ++++++++-------- stats/service.go | 10 +++++----- stats/service_test.go | 4 ++-- utils/consts.go | 2 +- 17 files changed, 89 insertions(+), 48 deletions(-) rename apier/v1/{Loader_it_test.go => loader_it_test.go} (100%) rename engine/{statsqueue.go => libstats.go} (98%) diff --git a/apier/v1/Loader_it_test.go b/apier/v1/loader_it_test.go similarity index 100% rename from apier/v1/Loader_it_test.go rename to apier/v1/loader_it_test.go diff --git a/apier/v1/resourcesv1.go b/apier/v1/resourcesv1.go index 6a909f091..54c2b0ff0 100644 --- a/apier/v1/resourcesv1.go +++ b/apier/v1/resourcesv1.go @@ -79,3 +79,16 @@ func (rsv1 *ResourceSV1) AllocateResource(args utils.AttrRLsResourceUsage, reply func (rsv1 *ResourceSV1) ReleaseResource(args utils.AttrRLsResourceUsage, reply *string) error { return rsv1.rls.V1ReleaseResource(args, reply) } + +//after implement test for it +func (apierV1 *ApierV1) GetResourceConfig() { + +} + +func (apierV1 *ApierV1) SetResourceConfig() { + +} + +func (apierV1 *ApierV1) RemResourceConfig() { + +} diff --git a/apier/v1/stats.go b/apier/v1/stats.go index 9690c6c30..bdfe5734d 100644 --- a/apier/v1/stats.go +++ b/apier/v1/stats.go @@ -87,3 +87,16 @@ func (stsv1 *StatSV1) GetFloatMetrics(queueID string, reply *map[string]float64) func (stsv1 *StatSV1) LoadQueues(args stats.ArgsLoadQueues, reply *string) (err error) { return stsv1.sts.V1LoadQueues(args, reply) } + +//after implement test for it +func (apierV1 *ApierV1) GetStatConfig() { + +} + +func (apierV1 *ApierV1) SetStatConfig() { + +} + +func (apierV1 *ApierV1) RemStatConfig() { + +} diff --git a/apier/v1/tpstats_it_test.go b/apier/v1/tpstats_it_test.go index b8e840a62..99fd0d4f9 100644 --- a/apier/v1/tpstats_it_test.go +++ b/apier/v1/tpstats_it_test.go @@ -31,15 +31,16 @@ import ( var tpCfgPath string var tpCfg *config.CGRConfig var tpRPC *rpc.Client +var tpDataDir = "/usr/share/cgrates" func TestTPStatInitCfg(t *testing.T) { var err error - tpCfgPath = path.Join(*dataDir, "conf", "samples", "tutmysql") + tpCfgPath = path.Join(tpDataDir, "conf", "samples", "tutmysql") tpCfg, err = config.NewCGRConfigFromFolder(tpCfgPath) if err != nil { t.Error(err) } - tpCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush() + tpCfg.DataFolderPath = tpDataDir // Share DataFolderPath through config towards StoreDb for Flush() config.SetCgrConfig(tpCfg) } diff --git a/engine/statsqueue.go b/engine/libstats.go similarity index 98% rename from engine/statsqueue.go rename to engine/libstats.go index 43cd455c1..3b88ab296 100755 --- a/engine/statsqueue.go +++ b/engine/libstats.go @@ -39,7 +39,7 @@ type SQStoredMetrics struct { } // StatsQueue represents the configuration of a StatsInstance in StatS -type StatsQueue struct { +type StatsConfig struct { ID string // QueueID Filters []*RequestFilter ActivationInterval *utils.ActivationInterval // Activation interval diff --git a/engine/model_helpers.go b/engine/model_helpers.go index 7137c9e96..e2e9fcb02 100755 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -2056,8 +2056,8 @@ func APItoModelStats(st *utils.TPStats) (mdls TpStatsS) { return } -func APItoStats(tpST *utils.TPStats, timezone string) (st *StatsQueue, err error) { - st = &StatsQueue{ +func APItoStats(tpST *utils.TPStats, timezone string) (st *StatsConfig, err error) { + st = &StatsConfig{ ID: tpST.ID, QueueLength: tpST.QueueLength, Weight: tpST.Weight, diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go index e174dee9e..e88bcaa6b 100755 --- a/engine/model_helpers_test.go +++ b/engine/model_helpers_test.go @@ -189,6 +189,7 @@ func TestApierTPTimingAsExportSlice(t *testing.T) { } } +/* De completat functia de test pentru ModelStats func TestAPItoModelStats(t *testing.T) { tpS := &utils.TPStats{ TPid: "TPS1", @@ -211,12 +212,24 @@ func TestAPItoModelStats(t *testing.T) { Weight: 20, Thresholds: nil, } + expectedSlc := [][]string{ + []string{,"TPS1", "*Stat1", "*string", "*Account", "1002", "2014-07-29T15:00:00Z","","1","MetricValue",}, + } expectedtpS := APItoModelStats(tpS) + var slc [][]string + lc, err := csvDump(expectedtpS) + if err != nil { + t.Error("Error dumping to csv: ", err) + } + slc = append(slc, lc) + if !reflect.DeepEqual(expectedtpS, tpS) { - t.Errorf("Expecting: %+v, received: %+v", expectedtpS, tpS) + t.Errorf("Expecting: %+v, received: %+v", expectedtpS, slc) } } +*/ + func TestTPRatingPlanAsExportSlice(t *testing.T) { tpRpln := &utils.TPRatingPlan{ TPid: "TEST_TPID", @@ -888,7 +901,7 @@ func TestAPItoTPStats(t *testing.T) { Weight: 20.0, } - eTPs := &StatsQueue{ID: tps.ID, + eTPs := &StatsConfig{ID: tps.ID, QueueLength: tps.QueueLength, Metrics: []string{"*asr", "*acd", "*acc"}, Thresholds: []string{"THRESH1", "THRESH2"}, diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 812dc24be..51c74f70b 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -111,9 +111,10 @@ type DataDB interface { GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) MatchReqFilterIndex(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) - GetStatsQueue(sqID string) (sq *StatsQueue, err error) - SetStatsQueue(sq *StatsQueue) (err error) - RemStatsQueue(sqID string) (err error) + //modicari + GetStatsConfig(sqID string) (sq *StatsConfig, err error) + SetStatsConfig(sq *StatsConfig) (err error) + RemStatsConfig(sqID string) (err error) GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) RemSQStoredMetrics(sqID string) (err error) diff --git a/engine/storage_map.go b/engine/storage_map.go index 90a8ef489..c51ff72f1 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1475,10 +1475,10 @@ func (ms *MapStorage) RemoveVersions(vrs Versions) (err error) { } // GetStatsQueue retrieves a StatsQueue from dataDB -func (ms *MapStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) { +func (ms *MapStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) { ms.mu.RLock() defer ms.mu.RUnlock() - key := utils.StatsQueuePrefix + sqID + key := utils.StatsConfigPrefix + sqID values, ok := ms.dict[key] if !ok { return nil, utils.ErrNotFound @@ -1496,22 +1496,22 @@ func (ms *MapStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) { } // SetStatsQueue stores a StatsQueue into DataDB -func (ms *MapStorage) SetStatsQueue(sq *StatsQueue) (err error) { +func (ms *MapStorage) SetStatsConfig(sq *StatsConfig) (err error) { ms.mu.Lock() defer ms.mu.Unlock() result, err := ms.ms.Marshal(sq) if err != nil { return err } - ms.dict[utils.StatsQueuePrefix+sq.ID] = result + ms.dict[utils.StatsConfigPrefix+sq.ID] = result return } // RemStatsQueue removes a StatsQueue from dataDB -func (ms *MapStorage) RemStatsQueue(sqID string) (err error) { +func (ms *MapStorage) RemStatsConfig(sqID string) (err error) { ms.mu.Lock() defer ms.mu.Unlock() - key := utils.StatsQueuePrefix + sqID + key := utils.StatsConfigPrefix + sqID delete(ms.dict, key) return } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 50574523a..3a8fdae85 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1985,10 +1985,10 @@ func (ms *MongoStorage) MatchReqFilterIndex(dbKey, fldName, fldVal string) (item } // GetStatsQueue retrieves a StatsQueue from dataDB -func (ms *MongoStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) { - session, col := ms.conn(utils.StatsQueuePrefix) +func (ms *MongoStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) { + session, col := ms.conn(utils.StatsConfigPrefix) defer session.Close() - sq = new(StatsQueue) + sq = new(StatsConfig) if err = col.Find(bson.M{"id": sqID}).One(&sq); err != nil { if err == mgo.ErrNotFound { err = utils.ErrNotFound @@ -2004,16 +2004,16 @@ func (ms *MongoStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) { } // SetStatsQueue stores a StatsQueue into DataDB -func (ms *MongoStorage) SetStatsQueue(sq *StatsQueue) (err error) { - session, col := ms.conn(utils.StatsQueuePrefix) +func (ms *MongoStorage) SetStatsConfig(sq *StatsConfig) (err error) { + session, col := ms.conn(utils.StatsConfigPrefix) defer session.Close() _, err = col.UpsertId(bson.M{"id": sq.ID}, sq) return } // RemStatsQueue removes a StatsQueue from dataDB -func (ms *MongoStorage) RemStatsQueue(sqID string) (err error) { - session, col := ms.conn(utils.StatsQueuePrefix) +func (ms *MongoStorage) RemStatsConfig(sqID string) (err error) { + session, col := ms.conn(utils.StatsConfigPrefix) err = col.Remove(bson.M{"id": sqID}) if err != nil { return err diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 282f435f6..022788c18 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1542,8 +1542,8 @@ func (rs *RedisStorage) RemoveVersions(vrs Versions) (err error) { } // GetStatsQueue retrieves a StatsQueue from dataDB -func (rs *RedisStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) { - key := utils.StatsQueuePrefix + sqID +func (rs *RedisStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) { + key := utils.StatsConfigPrefix + sqID var values []byte if values, err = rs.Cmd("GET", key).Bytes(); err != nil { if err == redis.ErrRespNil { @@ -1563,18 +1563,18 @@ func (rs *RedisStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) { } // SetStatsQueue stores a StatsQueue into DataDB -func (rs *RedisStorage) SetStatsQueue(sq *StatsQueue) (err error) { +func (rs *RedisStorage) SetStatsConfig(sq *StatsConfig) (err error) { var result []byte result, err = rs.ms.Marshal(sq) if err != nil { return } - return rs.Cmd("SET", utils.StatsQueuePrefix+sq.ID, result).Err + return rs.Cmd("SET", utils.StatsConfigPrefix+sq.ID, result).Err } // RemStatsQueue removes a StatsQueue from dataDB -func (rs *RedisStorage) RemStatsQueue(sqID string) (err error) { - key := utils.StatsQueuePrefix + sqID +func (rs *RedisStorage) RemStatsConfig(sqID string) (err error) { + key := utils.StatsConfigPrefix + sqID err = rs.Cmd("DEL", key).Err return } diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 0c3d00a9d..3c7e330d8 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -1955,7 +1955,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if err != nil { return err } - if err = tpr.dataStorage.SetStatsQueue(st); err != nil { + if err = tpr.dataStorage.SetStatsConfig(st); err != nil { return err } if verbose { diff --git a/stats/queue.go b/stats/queue.go index 1ee38597c..7fd0c1fcb 100644 --- a/stats/queue.go +++ b/stats/queue.go @@ -49,7 +49,7 @@ func (sis StatsInstances) remWithID(qID string) { // NewStatsInstance instantiates a StatsInstance func NewStatsInstance(sec *StatsEventCache, ms engine.Marshaler, - sqCfg *engine.StatsQueue, sqSM *engine.SQStoredMetrics) (si *StatsInstance, err error) { + sqCfg *engine.StatsConfig, sqSM *engine.SQStoredMetrics) (si *StatsInstance, err error) { si = &StatsInstance{sec: sec, ms: ms, cfg: sqCfg, sqMetrics: make(map[string]StatsMetric)} for _, metricID := range sqCfg.Metrics { if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil { @@ -85,7 +85,7 @@ type StatsInstance struct { sqItems []*engine.SQItem sqMetrics map[string]StatsMetric ms engine.Marshaler // used to get/set Metrics - cfg *engine.StatsQueue + cfg *engine.StatsConfig } // GetSQStoredMetrics retrieves the data used for store to DB diff --git a/stats/queue_test.go b/stats/queue_test.go index 7735d3643..83505a914 100644 --- a/stats/queue_test.go +++ b/stats/queue_test.go @@ -26,17 +26,17 @@ import ( func TestStatsInstancesSort(t *testing.T) { sInsts := StatsInstances{ - &StatsInstance{cfg: &engine.StatsQueue{ID: "FIRST", Weight: 30.0}}, - &StatsInstance{cfg: &engine.StatsQueue{ID: "SECOND", Weight: 40.0}}, - &StatsInstance{cfg: &engine.StatsQueue{ID: "THIRD", Weight: 30.0}}, - &StatsInstance{cfg: &engine.StatsQueue{ID: "FOURTH", Weight: 35.0}}, + &StatsInstance{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}}, + &StatsInstance{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}}, + &StatsInstance{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}}, + &StatsInstance{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}}, } sInsts.Sort() eSInst := StatsInstances{ - &StatsInstance{cfg: &engine.StatsQueue{ID: "SECOND", Weight: 40.0}}, - &StatsInstance{cfg: &engine.StatsQueue{ID: "FOURTH", Weight: 35.0}}, - &StatsInstance{cfg: &engine.StatsQueue{ID: "FIRST", Weight: 30.0}}, - &StatsInstance{cfg: &engine.StatsQueue{ID: "THIRD", Weight: 30.0}}, + &StatsInstance{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}}, + &StatsInstance{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}}, + &StatsInstance{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}}, + &StatsInstance{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}}, } if !reflect.DeepEqual(eSInst, sInsts) { t.Errorf("expecting: %+v, received: %+v", eSInst, sInsts) diff --git a/stats/service.go b/stats/service.go index 29f9ecebd..e119891e2 100755 --- a/stats/service.go +++ b/stats/service.go @@ -39,14 +39,14 @@ func init() { func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval time.Duration) (ss *StatService, err error) { ss = &StatService{dataDB: dataDB, ms: ms, storeInterval: storeInterval, stopStoring: make(chan struct{}), evCache: NewStatsEventCache()} - sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatsQueuePrefix) + sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatsConfigPrefix) if err != nil { return nil, err } ss.queuesCache = make(map[string]*StatsInstance) ss.queues = make(StatsInstances, 0) for _, prfx := range sqPrfxs { - if q, err := ss.loadQueue(prfx[len(utils.StatsQueuePrefix):]); err != nil { + if q, err := ss.loadQueue(prfx[len(utils.StatsConfigPrefix):]); err != nil { utils.Logger.Err(fmt.Sprintf(" failed loading quueue with id: <%s>, err: <%s>", q.cfg.ID, err.Error())) continue @@ -92,7 +92,7 @@ func (ss *StatService) Shutdown() error { // setQueue adds or modifies a queue into cache // sort will reorder the ss.queues func (ss *StatService) loadQueue(qID string) (q *StatsInstance, err error) { - sq, err := ss.dataDB.GetStatsQueue(qID) + sq, err := ss.dataDB.GetStatsConfig(qID) if err != nil { return nil, err } @@ -225,13 +225,13 @@ type ArgsLoadQueues struct { func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err error) { qIDs := args.QueueIDs if qIDs == nil { - sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsQueuePrefix) + sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsConfigPrefix) if err != nil { return err } queueIDs := make([]string, len(sqPrfxs)) for i, prfx := range sqPrfxs { - queueIDs[i] = prfx[len(utils.StatsQueuePrefix):] + queueIDs[i] = prfx[len(utils.StatsConfigPrefix):] } if len(queueIDs) != 0 { qIDs = &queueIDs diff --git a/stats/service_test.go b/stats/service_test.go index 940f752a9..b0b580017 100644 --- a/stats/service_test.go +++ b/stats/service_test.go @@ -32,8 +32,8 @@ func TestReqFilterPassStatS(t *testing.T) { config.SetCgrConfig(cgrCfg) } dataStorage, _ := engine.NewMapStorage() - dataStorage.SetStatsQueue( - &engine.StatsQueue{ID: "CDRST1", + dataStorage.SetStatsConfig( + &engine.StatsConfig{ID: "CDRST1", Filters: []*engine.RequestFilter{ &engine.RequestFilter{Type: engine.MetaString, FieldName: "Tenant", Values: []string{"cgrates.org"}}}, diff --git a/utils/consts.go b/utils/consts.go index 451bdbf25..11d340bf5 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -251,7 +251,7 @@ const ( LOG_CDR = "cdr_" LOG_MEDIATED_CDR = "mcd_" SQStoredMetricsPrefix = "ssm_" - StatsQueuePrefix = "stq_" + StatsConfigPrefix = "stq_" ThresholdCfgPrefix = "thc_" LOADINST_KEY = "load_history" SESSION_MANAGER_SOURCE = "SMR" From 6c228e62761a8164d842e829ebb939940268a9bf Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 24 Aug 2017 12:27:22 -0400 Subject: [PATCH 4/5] Header add for loader_it_test.go and tpstats_it_test.go --- apier/v1/loader_it_test.go | 2 ++ apier/v1/tpstats_it_test.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/apier/v1/loader_it_test.go b/apier/v1/loader_it_test.go index 0d053dc5c..f8ef610b3 100644 --- a/apier/v1/loader_it_test.go +++ b/apier/v1/loader_it_test.go @@ -1,3 +1,5 @@ +// +build integration + /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH diff --git a/apier/v1/tpstats_it_test.go b/apier/v1/tpstats_it_test.go index 99fd0d4f9..c27b25bdf 100644 --- a/apier/v1/tpstats_it_test.go +++ b/apier/v1/tpstats_it_test.go @@ -1,3 +1,5 @@ +// +build integration + /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH From d862a435d214b2b9e4cab08c3cae777033c97cc0 Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 24 Aug 2017 12:46:24 -0400 Subject: [PATCH 5/5] Change prefix for StatsConfig --- engine/model_helpers_test.go | 2 +- engine/storage_map.go | 16 ++++++++-------- utils/consts.go | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go index e88bcaa6b..7af59e2e8 100755 --- a/engine/model_helpers_test.go +++ b/engine/model_helpers_test.go @@ -189,7 +189,7 @@ func TestApierTPTimingAsExportSlice(t *testing.T) { } } -/* De completat functia de test pentru ModelStats +/* func TestAPItoModelStats(t *testing.T) { tpS := &utils.TPStats{ TPid: "TPS1", diff --git a/engine/storage_map.go b/engine/storage_map.go index c51ff72f1..820ad8405 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1475,7 +1475,7 @@ func (ms *MapStorage) RemoveVersions(vrs Versions) (err error) { } // GetStatsQueue retrieves a StatsQueue from dataDB -func (ms *MapStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) { +func (ms *MapStorage) GetStatsConfig(sqID string) (scf *StatsConfig, err error) { ms.mu.RLock() defer ms.mu.RUnlock() key := utils.StatsConfigPrefix + sqID @@ -1483,11 +1483,11 @@ func (ms *MapStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) { if !ok { return nil, utils.ErrNotFound } - err = ms.ms.Unmarshal(values, &sq) + err = ms.ms.Unmarshal(values, &scf) if err != nil { return nil, err } - for _, fltr := range sq.Filters { + for _, fltr := range scf.Filters { if err := fltr.CompileValues(); err != nil { return nil, err } @@ -1496,22 +1496,22 @@ func (ms *MapStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) { } // SetStatsQueue stores a StatsQueue into DataDB -func (ms *MapStorage) SetStatsConfig(sq *StatsConfig) (err error) { +func (ms *MapStorage) SetStatsConfig(scf *StatsConfig) (err error) { ms.mu.Lock() defer ms.mu.Unlock() - result, err := ms.ms.Marshal(sq) + result, err := ms.ms.Marshal(scf) if err != nil { return err } - ms.dict[utils.StatsConfigPrefix+sq.ID] = result + ms.dict[utils.StatsConfigPrefix+scf.ID] = result return } // RemStatsQueue removes a StatsQueue from dataDB -func (ms *MapStorage) RemStatsConfig(sqID string) (err error) { +func (ms *MapStorage) RemStatsConfig(scfID string) (err error) { ms.mu.Lock() defer ms.mu.Unlock() - key := utils.StatsConfigPrefix + sqID + key := utils.StatsConfigPrefix + scfID delete(ms.dict, key) return } diff --git a/utils/consts.go b/utils/consts.go index 11d340bf5..d2a6e03a8 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -251,7 +251,7 @@ const ( LOG_CDR = "cdr_" LOG_MEDIATED_CDR = "mcd_" SQStoredMetricsPrefix = "ssm_" - StatsConfigPrefix = "stq_" + StatsConfigPrefix = "scf_" ThresholdCfgPrefix = "thc_" LOADINST_KEY = "load_history" SESSION_MANAGER_SOURCE = "SMR"