diff --git a/apier/v1/loader_it_test.go b/apier/v1/loader_it_test.go
new file mode 100644
index 000000000..f8ef610b3
--- /dev/null
+++ b/apier/v1/loader_it_test.go
@@ -0,0 +1,99 @@
+// +build integration
+
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+package v1
+
+import (
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+ "net/rpc"
+ "net/rpc/jsonrpc"
+ "path"
+ "testing"
+)
+
+var loaderCfgPath string
+var loaderCfg *config.CGRConfig
+var loaderRPC *rpc.Client
+var loaderDataDir = "/usr/share/cgrates"
+
+func TestLoaderInitCfg(t *testing.T) {
+ var err error
+ loaderCfgPath = path.Join(loaderDataDir, "conf", "samples", "tutmysql")
+ loaderCfg, err = config.NewCGRConfigFromFolder(loaderCfgPath)
+ if err != nil {
+ t.Error(err)
+ }
+ loaderCfg.DataFolderPath = loaderDataDir // Share DataFolderPath through config towards StoreDb for Flush()
+ config.SetCgrConfig(loaderCfg)
+}
+
+func TestLoaderInitDataDb(t *testing.T) {
+ if err := engine.InitDataDb(loaderCfg); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Wipe out the cdr database
+func TestLoaderStorDb(t *testing.T) {
+ if err := engine.InitStorDb(loaderCfg); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Start CGR Engine
+func TestLoaderStartEngine(t *testing.T) {
+ if _, err := engine.StopStartEngine(loaderCfgPath, 1000); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Connect rpc client to rater
+func TestLoaderRpcConn(t *testing.T) {
+ var err error
+ loaderRPC, err = jsonrpc.Dial("tcp", loaderCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestLoaderImportTPFromFolderPath(t *testing.T) {
+ var reply string
+ if err := loaderRPC.Call("ApierV1.ImportTariffPlanFromFolder", utils.AttrImportTPFromFolder{TPid: "TEST_LOADER", FolderPath: path.Join(loaderDataDir, "tariffplans", "tutorial")}, &reply); err != nil {
+ t.Error("Got error on ApierV1.ImportTarrifPlanFromFolder: ", err.Error())
+ } else if reply != utils.OK {
+ t.Error("Calling ApierV1.ImportTarrifPlanFromFolder got reply: ", reply)
+ }
+}
+
+func TestLoaderLoadTariffPlanFromStorDbDryRun(t *testing.T) {
+ var reply string
+ if err := loaderRPC.Call("ApierV1.LoadTariffPlanFromStorDb", AttrLoadTpFromStorDb{TPid: "TEST_LOADER", DryRun: true}, &reply); err != nil {
+ t.Error("Got error on ApierV1.LoadTariffPlanFromStorDb: ", err.Error())
+ } else if reply != utils.OK {
+ t.Error("Calling ApierV1.LoadTariffPlanFromStorDb got reply: ", reply)
+ }
+}
+
+func TestLoaderKillEngine(t *testing.T) {
+ if err := engine.KillEngine(100); err != nil {
+ t.Error(err)
+ }
+}
diff --git a/apier/v1/resourcesv1.go b/apier/v1/resourcesv1.go
index 6a909f091..54c2b0ff0 100644
--- a/apier/v1/resourcesv1.go
+++ b/apier/v1/resourcesv1.go
@@ -79,3 +79,16 @@ func (rsv1 *ResourceSV1) AllocateResource(args utils.AttrRLsResourceUsage, reply
func (rsv1 *ResourceSV1) ReleaseResource(args utils.AttrRLsResourceUsage, reply *string) error {
return rsv1.rls.V1ReleaseResource(args, reply)
}
+
+//after implement test for it
+func (apierV1 *ApierV1) GetResourceConfig() {
+
+}
+
+func (apierV1 *ApierV1) SetResourceConfig() {
+
+}
+
+func (apierV1 *ApierV1) RemResourceConfig() {
+
+}
diff --git a/apier/v1/stats.go b/apier/v1/stats.go
index 9690c6c30..bdfe5734d 100644
--- a/apier/v1/stats.go
+++ b/apier/v1/stats.go
@@ -87,3 +87,16 @@ func (stsv1 *StatSV1) GetFloatMetrics(queueID string, reply *map[string]float64)
func (stsv1 *StatSV1) LoadQueues(args stats.ArgsLoadQueues, reply *string) (err error) {
return stsv1.sts.V1LoadQueues(args, reply)
}
+
+//after implement test for it
+func (apierV1 *ApierV1) GetStatConfig() {
+
+}
+
+func (apierV1 *ApierV1) SetStatConfig() {
+
+}
+
+func (apierV1 *ApierV1) RemStatConfig() {
+
+}
diff --git a/apier/v1/tpstats.go b/apier/v1/tpstats.go
new file mode 100644
index 000000000..cb2e507b2
--- /dev/null
+++ b/apier/v1/tpstats.go
@@ -0,0 +1,85 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+package v1
+
+import (
+ "github.com/cgrates/cgrates/utils"
+)
+
+func (self *ApierV1) SetTPStat(attr utils.TPStats, reply *string) error {
+ if missing := utils.MissingStructFields(&attr, []string{"TPid", "ID"}); len(missing) != 0 {
+ return utils.NewErrMandatoryIeMissing(missing...)
+ }
+ if err := self.StorDb.SetTPStats([]*utils.TPStats{&attr}); err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ *reply = utils.OK
+ return nil
+}
+
+type AttrGetTPStat struct {
+ TPid string // Tariff plan id
+ ID string
+}
+
+func (self *ApierV1) GetTPStat(attr AttrGetTPStat, reply *utils.TPStats) error {
+ if missing := utils.MissingStructFields(&attr, []string{"TPid", "ID"}); len(missing) != 0 { //Params missing
+ return utils.NewErrMandatoryIeMissing(missing...)
+ }
+ if rls, err := self.StorDb.GetTPStats(attr.TPid, attr.ID); err != nil {
+ if err.Error() != utils.ErrNotFound.Error() {
+ err = utils.NewErrServerError(err)
+ }
+ return err
+ } else {
+ *reply = *rls[0]
+ }
+ return nil
+}
+
+type AttrGetTPStatIds struct {
+ TPid string // Tariff plan id
+ utils.Paginator
+}
+
+func (self *ApierV1) GetTPStatIDs(attrs AttrGetTPStatIds, reply *[]string) error {
+ if missing := utils.MissingStructFields(&attrs, []string{"TPid"}); len(missing) != 0 { //Params missing
+ return utils.NewErrMandatoryIeMissing(missing...)
+ }
+ if ids, err := self.StorDb.GetTpTableIds(attrs.TPid, utils.TBLTPStats, utils.TPDistinctIds{"tag"}, nil, &attrs.Paginator); err != nil {
+ return utils.NewErrServerError(err)
+ } else if ids == nil {
+ return utils.ErrNotFound
+ } else {
+ *reply = ids
+ }
+ return nil
+}
+
+func (self *ApierV1) RemTPStat(attrs AttrGetTPStat, reply *string) error {
+ if missing := utils.MissingStructFields(&attrs, []string{"TPid", "ID"}); len(missing) != 0 { //Params missing
+ return utils.NewErrMandatoryIeMissing(missing...)
+ }
+ if err := self.StorDb.RemTpData(utils.TBLTPStats, attrs.TPid, map[string]string{"tag": attrs.ID}); err != nil {
+ return utils.NewErrServerError(err)
+ } else {
+ *reply = utils.OK
+ }
+ return nil
+
+}
diff --git a/apier/v1/tpstats_it_test.go b/apier/v1/tpstats_it_test.go
new file mode 100644
index 000000000..c27b25bdf
--- /dev/null
+++ b/apier/v1/tpstats_it_test.go
@@ -0,0 +1,178 @@
+// +build integration
+
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+package v1
+
+import (
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+ "net/rpc"
+ "net/rpc/jsonrpc"
+ "path"
+ "reflect"
+ "testing"
+)
+
+var tpCfgPath string
+var tpCfg *config.CGRConfig
+var tpRPC *rpc.Client
+var tpDataDir = "/usr/share/cgrates"
+
+func TestTPStatInitCfg(t *testing.T) {
+ var err error
+ tpCfgPath = path.Join(tpDataDir, "conf", "samples", "tutmysql")
+ tpCfg, err = config.NewCGRConfigFromFolder(tpCfgPath)
+ if err != nil {
+ t.Error(err)
+ }
+ tpCfg.DataFolderPath = tpDataDir // Share DataFolderPath through config towards StoreDb for Flush()
+ config.SetCgrConfig(tpCfg)
+}
+
+// Wipe out the cdr database
+func TestTPStatResetStorDb(t *testing.T) {
+ if err := engine.InitStorDb(tpCfg); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Start CGR Engine
+
+func TestTPStatStartEngine(t *testing.T) {
+ if _, err := engine.StopStartEngine(tpCfgPath, 1000); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Connect rpc client to rater
+func TestTPStatRpcConn(t *testing.T) {
+ var err error
+ tpRPC, err = jsonrpc.Dial("tcp", tpCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+var tpStat = &utils.TPStats{
+ TPid: "TPS1",
+ ID: "Stat1",
+ Filters: []*utils.TPRequestFilter{
+ &utils.TPRequestFilter{
+ Type: "*string",
+ FieldName: "Account",
+ Values: []string{"1001", "1002"},
+ },
+ &utils.TPRequestFilter{
+ Type: "*string_prefix",
+ FieldName: "Destination",
+ Values: []string{"10", "20"},
+ },
+ },
+ ActivationInterval: &utils.TPActivationInterval{
+ ActivationTime: "2014-07-29T15:00:00Z",
+ ExpiryTime: "",
+ },
+ TTL: "1",
+ Metrics: []string{"MetricValue", "MetricValueTwo"},
+ Blocker: true,
+ Stored: true,
+ Weight: 20,
+ Thresholds: nil,
+}
+
+func TestTPStatGetTPStatIDs(t *testing.T) {
+ var reply []string
+ if err := tpRPC.Call("ApierV1.GetTPStatIDs", AttrGetTPStatIds{TPid: "TPS1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
+ t.Error(err)
+ }
+}
+
+func TestTPStatSetTPStat(t *testing.T) {
+ var result string
+ if err := tpRPC.Call("ApierV1.SetTPStat", tpStat, &result); err != nil {
+ t.Error(err)
+ } else if result != utils.OK {
+ t.Error("Unexpected reply returned", result)
+ }
+}
+
+func TestTPStatGetTPStat(t *testing.T) {
+ var respond *utils.TPStats
+ if err := tpRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &respond); err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(tpStat, respond) {
+ t.Errorf("Expecting: %+v, received: %+v", tpStat.TPid, respond.TPid)
+ }
+}
+
+func TestTPStatUpdateTPStat(t *testing.T) {
+ var result string
+ tpStat.Weight = 21
+ tpStat.Filters = []*utils.TPRequestFilter{
+ &utils.TPRequestFilter{
+ Type: "*string",
+ FieldName: "Account",
+ Values: []string{"1001", "1002"},
+ },
+ &utils.TPRequestFilter{
+ Type: "*string_prefix",
+ FieldName: "Destination",
+ Values: []string{"10", "20"},
+ },
+ &utils.TPRequestFilter{
+ Type: "*rsr_fields",
+ FieldName: "",
+ Values: []string{"Subject(~^1.*1$)", "Destination(1002)"},
+ },
+ }
+ if err := tpRPC.Call("ApierV1.SetTPStat", tpStat, &result); err != nil {
+ t.Error(err)
+ } else if result != utils.OK {
+ t.Error("Unexpected reply returned", result)
+ }
+ var expectedTPS *utils.TPStats
+ if err := tpRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &expectedTPS); err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(tpStat, expectedTPS) {
+ t.Errorf("Expecting: %+v, received: %+v", tpStat, expectedTPS)
+ }
+}
+
+func TestTPStatRemTPStat(t *testing.T) {
+ var resp string
+ if err := tpRPC.Call("ApierV1.RemTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &resp); err != nil {
+ t.Error(err)
+ } else if resp != utils.OK {
+ t.Error("Unexpected reply returned", resp)
+ }
+}
+
+func TestTPStatCheckDelete(t *testing.T) {
+ var respond *utils.TPStats
+ if err := tpRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: "TPS1", ID: "Stat1"}, &respond); err == nil || err.Error() != utils.ErrNotFound.Error() {
+ t.Error(err)
+ }
+}
+
+func TestTPStatKillEngine(t *testing.T) {
+ if err := engine.KillEngine(100); err != nil {
+ t.Error(err)
+ }
+}
diff --git a/engine/statsqueue.go b/engine/libstats.go
similarity index 98%
rename from engine/statsqueue.go
rename to engine/libstats.go
index 43cd455c1..3b88ab296 100755
--- a/engine/statsqueue.go
+++ b/engine/libstats.go
@@ -39,7 +39,7 @@ type SQStoredMetrics struct {
}
// StatsQueue represents the configuration of a StatsInstance in StatS
-type StatsQueue struct {
+type StatsConfig struct {
ID string // QueueID
Filters []*RequestFilter
ActivationInterval *utils.ActivationInterval // Activation interval
diff --git a/engine/model_helpers.go b/engine/model_helpers.go
index 61ea3bea6..e2e9fcb02 100755
--- a/engine/model_helpers.go
+++ b/engine/model_helpers.go
@@ -2024,8 +2024,11 @@ func APItoModelStats(st *utils.TPStats) (mdls TpStatsS) {
mdl.Stored = st.Stored
mdl.Weight = st.Weight
mdl.QueueLength = st.QueueLength
- for _, val := range st.Metrics {
- mdl.Metrics = mdl.Metrics + utils.INFIELD_SEP + val
+ for i, val := range st.Metrics {
+ if i != 0 {
+ mdl.Metrics += utils.INFIELD_SEP
+ }
+ mdl.Metrics += val
}
for _, val := range st.Thresholds {
mdl.Thresholds = mdl.Thresholds + utils.INFIELD_SEP + val
@@ -2053,8 +2056,8 @@ func APItoModelStats(st *utils.TPStats) (mdls TpStatsS) {
return
}
-func APItoStats(tpST *utils.TPStats, timezone string) (st *StatsQueue, err error) {
- st = &StatsQueue{
+func APItoStats(tpST *utils.TPStats, timezone string) (st *StatsConfig, err error) {
+ st = &StatsConfig{
ID: tpST.ID,
QueueLength: tpST.QueueLength,
Weight: tpST.Weight,
diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go
index 64d7589d6..7af59e2e8 100755
--- a/engine/model_helpers_test.go
+++ b/engine/model_helpers_test.go
@@ -15,6 +15,7 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see
*/
+
package engine
import (
@@ -188,6 +189,47 @@ func TestApierTPTimingAsExportSlice(t *testing.T) {
}
}
+/*
+func TestAPItoModelStats(t *testing.T) {
+ tpS := &utils.TPStats{
+ TPid: "TPS1",
+ ID: "Stat1",
+ Filters: []*utils.TPRequestFilter{
+ &utils.TPRequestFilter{
+ Type: "*string",
+ FieldName: "Account",
+ Values: []string{"1002"},
+ },
+ },
+ ActivationInterval: &utils.TPActivationInterval{
+ ActivationTime: "2014-07-29T15:00:00Z",
+ ExpiryTime: "",
+ },
+ TTL: "1",
+ Metrics: []string{"MetricValue"},
+ Blocker: true,
+ Stored: true,
+ Weight: 20,
+ Thresholds: nil,
+ }
+ expectedSlc := [][]string{
+ []string{,"TPS1", "*Stat1", "*string", "*Account", "1002", "2014-07-29T15:00:00Z","","1","MetricValue",},
+ }
+ expectedtpS := APItoModelStats(tpS)
+ var slc [][]string
+ lc, err := csvDump(expectedtpS)
+ if err != nil {
+ t.Error("Error dumping to csv: ", err)
+ }
+ slc = append(slc, lc)
+
+ if !reflect.DeepEqual(expectedtpS, tpS) {
+ t.Errorf("Expecting: %+v, received: %+v", expectedtpS, slc)
+ }
+}
+
+*/
+
func TestTPRatingPlanAsExportSlice(t *testing.T) {
tpRpln := &utils.TPRatingPlan{
TPid: "TEST_TPID",
@@ -859,7 +901,7 @@ func TestAPItoTPStats(t *testing.T) {
Weight: 20.0,
}
- eTPs := &StatsQueue{ID: tps.ID,
+ eTPs := &StatsConfig{ID: tps.ID,
QueueLength: tps.QueueLength,
Metrics: []string{"*asr", "*acd", "*acc"},
Thresholds: []string{"THRESH1", "THRESH2"},
diff --git a/engine/storage_interface.go b/engine/storage_interface.go
index 812dc24be..51c74f70b 100755
--- a/engine/storage_interface.go
+++ b/engine/storage_interface.go
@@ -111,9 +111,10 @@ type DataDB interface {
GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error)
SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error)
MatchReqFilterIndex(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error)
- GetStatsQueue(sqID string) (sq *StatsQueue, err error)
- SetStatsQueue(sq *StatsQueue) (err error)
- RemStatsQueue(sqID string) (err error)
+ //modicari
+ GetStatsConfig(sqID string) (sq *StatsConfig, err error)
+ SetStatsConfig(sq *StatsConfig) (err error)
+ RemStatsConfig(sqID string) (err error)
GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error)
SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error)
RemSQStoredMetrics(sqID string) (err error)
diff --git a/engine/storage_map.go b/engine/storage_map.go
index 90a8ef489..820ad8405 100755
--- a/engine/storage_map.go
+++ b/engine/storage_map.go
@@ -1475,19 +1475,19 @@ func (ms *MapStorage) RemoveVersions(vrs Versions) (err error) {
}
// GetStatsQueue retrieves a StatsQueue from dataDB
-func (ms *MapStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) {
+func (ms *MapStorage) GetStatsConfig(sqID string) (scf *StatsConfig, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
- key := utils.StatsQueuePrefix + sqID
+ key := utils.StatsConfigPrefix + sqID
values, ok := ms.dict[key]
if !ok {
return nil, utils.ErrNotFound
}
- err = ms.ms.Unmarshal(values, &sq)
+ err = ms.ms.Unmarshal(values, &scf)
if err != nil {
return nil, err
}
- for _, fltr := range sq.Filters {
+ for _, fltr := range scf.Filters {
if err := fltr.CompileValues(); err != nil {
return nil, err
}
@@ -1496,22 +1496,22 @@ func (ms *MapStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) {
}
// SetStatsQueue stores a StatsQueue into DataDB
-func (ms *MapStorage) SetStatsQueue(sq *StatsQueue) (err error) {
+func (ms *MapStorage) SetStatsConfig(scf *StatsConfig) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
- result, err := ms.ms.Marshal(sq)
+ result, err := ms.ms.Marshal(scf)
if err != nil {
return err
}
- ms.dict[utils.StatsQueuePrefix+sq.ID] = result
+ ms.dict[utils.StatsConfigPrefix+scf.ID] = result
return
}
// RemStatsQueue removes a StatsQueue from dataDB
-func (ms *MapStorage) RemStatsQueue(sqID string) (err error) {
+func (ms *MapStorage) RemStatsConfig(scfID string) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
- key := utils.StatsQueuePrefix + sqID
+ key := utils.StatsConfigPrefix + scfID
delete(ms.dict, key)
return
}
diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go
index 45846614b..3a8fdae85 100755
--- a/engine/storage_mongo_datadb.go
+++ b/engine/storage_mongo_datadb.go
@@ -180,7 +180,7 @@ func (ms *MongoStorage) EnsureIndexes() (err error) {
Sparse: false,
}
for _, col := range []string{utils.TBLTPTimings, utils.TBLTPDestinations, utils.TBLTPDestinationRates, utils.TBLTPRatingPlans,
- utils.TBLTPSharedGroups, utils.TBLTPCdrStats, utils.TBLTPActions, utils.TBLTPActionPlans, utils.TBLTPActionTriggers} {
+ utils.TBLTPSharedGroups, utils.TBLTPCdrStats, utils.TBLTPActions, utils.TBLTPActionPlans, utils.TBLTPActionTriggers, utils.TBLTPStats} {
if err = db.C(col).EnsureIndex(idx); err != nil {
return
}
@@ -1985,10 +1985,10 @@ func (ms *MongoStorage) MatchReqFilterIndex(dbKey, fldName, fldVal string) (item
}
// GetStatsQueue retrieves a StatsQueue from dataDB
-func (ms *MongoStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) {
- session, col := ms.conn(utils.StatsQueuePrefix)
+func (ms *MongoStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) {
+ session, col := ms.conn(utils.StatsConfigPrefix)
defer session.Close()
- sq = new(StatsQueue)
+ sq = new(StatsConfig)
if err = col.Find(bson.M{"id": sqID}).One(&sq); err != nil {
if err == mgo.ErrNotFound {
err = utils.ErrNotFound
@@ -2004,16 +2004,16 @@ func (ms *MongoStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) {
}
// SetStatsQueue stores a StatsQueue into DataDB
-func (ms *MongoStorage) SetStatsQueue(sq *StatsQueue) (err error) {
- session, col := ms.conn(utils.StatsQueuePrefix)
+func (ms *MongoStorage) SetStatsConfig(sq *StatsConfig) (err error) {
+ session, col := ms.conn(utils.StatsConfigPrefix)
defer session.Close()
_, err = col.UpsertId(bson.M{"id": sq.ID}, sq)
return
}
// RemStatsQueue removes a StatsQueue from dataDB
-func (ms *MongoStorage) RemStatsQueue(sqID string) (err error) {
- session, col := ms.conn(utils.StatsQueuePrefix)
+func (ms *MongoStorage) RemStatsConfig(sqID string) (err error) {
+ session, col := ms.conn(utils.StatsConfigPrefix)
err = col.Remove(bson.M{"id": sqID})
if err != nil {
return err
diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go
index bfc9658d0..8bdf92996 100755
--- a/engine/storage_mongo_stordb.go
+++ b/engine/storage_mongo_stordb.go
@@ -379,6 +379,23 @@ func (ms *MongoStorage) GetTPResourceLimits(tpid, id string) ([]*utils.TPResourc
return results, err
}
+func (ms *MongoStorage) GetTPStats(tpid, id string) ([]*utils.TPStats, error) {
+ filter := bson.M{
+ "tpid": tpid,
+ }
+ if id != "" {
+ filter["id"] = id
+ }
+ var results []*utils.TPStats
+ session, col := ms.conn(utils.TBLTPStats)
+ defer session.Close()
+ err := col.Find(filter).All(&results)
+ if len(results) == 0 {
+ return results, utils.ErrNotFound
+ }
+ return results, err
+}
+
func (ms *MongoStorage) GetTPDerivedChargers(tp *utils.TPDerivedChargers) ([]*utils.TPDerivedChargers, error) {
filter := bson.M{"tpid": tp.TPid}
if tp.Direction != "" {
@@ -841,6 +858,20 @@ func (ms *MongoStorage) SetTPResourceLimits(tpRLs []*utils.TPResourceLimit) (err
return
}
+func (ms *MongoStorage) SetTPRStats(tpS []*utils.TPStats) (err error) {
+ if len(tpS) == 0 {
+ return
+ }
+ session, col := ms.conn(utils.TBLTPStats)
+ defer session.Close()
+ tx := col.Bulk()
+ for _, tp := range tpS {
+ tx.Upsert(bson.M{"tpid": tp.TPid, "id": tp.ID}, tp)
+ }
+ _, err = tx.Run()
+ return
+}
+
func (ms *MongoStorage) SetSMCost(smc *SMCost) error {
if smc.CostDetails == nil {
return nil
@@ -1105,7 +1136,7 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR,
return cdrs, 0, nil
}
-func (ms *MongoStorage) GetTPStats(tpid, id string) ([]*utils.TPStats, error) {
+func (ms *MongoStorage) GetTPStat(tpid, id string) ([]*utils.TPStats, error) {
filter := bson.M{
"tpid": tpid,
}
diff --git a/engine/storage_redis.go b/engine/storage_redis.go
index 282f435f6..022788c18 100755
--- a/engine/storage_redis.go
+++ b/engine/storage_redis.go
@@ -1542,8 +1542,8 @@ func (rs *RedisStorage) RemoveVersions(vrs Versions) (err error) {
}
// GetStatsQueue retrieves a StatsQueue from dataDB
-func (rs *RedisStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) {
- key := utils.StatsQueuePrefix + sqID
+func (rs *RedisStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) {
+ key := utils.StatsConfigPrefix + sqID
var values []byte
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
if err == redis.ErrRespNil {
@@ -1563,18 +1563,18 @@ func (rs *RedisStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) {
}
// SetStatsQueue stores a StatsQueue into DataDB
-func (rs *RedisStorage) SetStatsQueue(sq *StatsQueue) (err error) {
+func (rs *RedisStorage) SetStatsConfig(sq *StatsConfig) (err error) {
var result []byte
result, err = rs.ms.Marshal(sq)
if err != nil {
return
}
- return rs.Cmd("SET", utils.StatsQueuePrefix+sq.ID, result).Err
+ return rs.Cmd("SET", utils.StatsConfigPrefix+sq.ID, result).Err
}
// RemStatsQueue removes a StatsQueue from dataDB
-func (rs *RedisStorage) RemStatsQueue(sqID string) (err error) {
- key := utils.StatsQueuePrefix + sqID
+func (rs *RedisStorage) RemStatsConfig(sqID string) (err error) {
+ key := utils.StatsConfigPrefix + sqID
err = rs.Cmd("DEL", key).Err
return
}
diff --git a/engine/storage_sql.go b/engine/storage_sql.go
index 47a31a029..ec8f8d9f0 100755
--- a/engine/storage_sql.go
+++ b/engine/storage_sql.go
@@ -190,9 +190,7 @@ func (self *SQLStorage) RemTpData(table, tpid string, args map[string]string) er
if len(table) == 0 { // Remove tpid out of all tables
for _, tblName := range []string{utils.TBLTPTimings, utils.TBLTPDestinations, utils.TBLTPRates, utils.TBLTPDestinationRates, utils.TBLTPRatingPlans, utils.TBLTPRateProfiles,
utils.TBLTPSharedGroups, utils.TBLTPCdrStats, utils.TBLTPLcrs, utils.TBLTPActions, utils.TBLTPActionPlans, utils.TBLTPActionTriggers, utils.TBLTPAccountActions,
- utils.TBLTPDerivedChargers, utils.TBLTPAliases, utils.TBLTPUsers, utils.TBLTPResourceLimits,
- // utils.TBLTPStats
- } {
+ utils.TBLTPDerivedChargers, utils.TBLTPAliases, utils.TBLTPUsers, utils.TBLTPResourceLimits, utils.TBLTPStats} {
if err := tx.Table(tblName).Where("tpid = ?", tpid).Delete(nil).Error; err != nil {
tx.Rollback()
return err
@@ -201,6 +199,7 @@ func (self *SQLStorage) RemTpData(table, tpid string, args map[string]string) er
tx.Commit()
return nil
}
+ utils.Logger.Debug(fmt.Sprintf("#Rem sterge %s", tpid))
// Remove from a single table
tx = tx.Table(table).Where("tpid = ?", tpid)
// Compose filters
@@ -219,8 +218,10 @@ func (self *SQLStorage) SetTPTimings(timings []*utils.ApierTPTiming) error {
if len(timings) == 0 {
return nil
}
+
tx := self.db.Begin()
for _, timing := range timings {
+ utils.Logger.Debug(fmt.Sprintf("#1(set) Id care trimite %s", timing.ID))
if err := tx.Where(&TpTiming{Tpid: timing.TPid, Tag: timing.ID}).Delete(TpTiming{}).Error; err != nil {
tx.Rollback()
return err
@@ -1170,6 +1171,8 @@ func (self *SQLStorage) GetTPDestinationRates(tpid, id string, pagination *utils
func (self *SQLStorage) GetTPTimings(tpid, id string) ([]*utils.ApierTPTiming, error) {
var tpTimings TpTimings
q := self.db.Where("tpid = ?", tpid)
+ utils.Logger.Debug(fmt.Sprintf("#1 Id care trimite %s", id))
+ utils.Logger.Debug(fmt.Sprintf("#1 TPId care trimite %s", tpid))
if len(id) != 0 {
q = q.Where("tag = ?", id)
}
@@ -1177,6 +1180,7 @@ func (self *SQLStorage) GetTPTimings(tpid, id string) ([]*utils.ApierTPTiming, e
return nil, err
}
ts := tpTimings.AsTPTimings()
+ utils.Logger.Debug(fmt.Sprintf("#2 ce gaseste : %s", ts))
if len(ts) == 0 {
return ts, utils.ErrNotFound
}
diff --git a/engine/tp_reader.go b/engine/tp_reader.go
index 0c3d00a9d..3c7e330d8 100755
--- a/engine/tp_reader.go
+++ b/engine/tp_reader.go
@@ -1955,7 +1955,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
if err != nil {
return err
}
- if err = tpr.dataStorage.SetStatsQueue(st); err != nil {
+ if err = tpr.dataStorage.SetStatsConfig(st); err != nil {
return err
}
if verbose {
diff --git a/stats/queue.go b/stats/queue.go
index 1ee38597c..7fd0c1fcb 100644
--- a/stats/queue.go
+++ b/stats/queue.go
@@ -49,7 +49,7 @@ func (sis StatsInstances) remWithID(qID string) {
// NewStatsInstance instantiates a StatsInstance
func NewStatsInstance(sec *StatsEventCache, ms engine.Marshaler,
- sqCfg *engine.StatsQueue, sqSM *engine.SQStoredMetrics) (si *StatsInstance, err error) {
+ sqCfg *engine.StatsConfig, sqSM *engine.SQStoredMetrics) (si *StatsInstance, err error) {
si = &StatsInstance{sec: sec, ms: ms, cfg: sqCfg, sqMetrics: make(map[string]StatsMetric)}
for _, metricID := range sqCfg.Metrics {
if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil {
@@ -85,7 +85,7 @@ type StatsInstance struct {
sqItems []*engine.SQItem
sqMetrics map[string]StatsMetric
ms engine.Marshaler // used to get/set Metrics
- cfg *engine.StatsQueue
+ cfg *engine.StatsConfig
}
// GetSQStoredMetrics retrieves the data used for store to DB
diff --git a/stats/queue_test.go b/stats/queue_test.go
index 7735d3643..83505a914 100644
--- a/stats/queue_test.go
+++ b/stats/queue_test.go
@@ -26,17 +26,17 @@ import (
func TestStatsInstancesSort(t *testing.T) {
sInsts := StatsInstances{
- &StatsInstance{cfg: &engine.StatsQueue{ID: "FIRST", Weight: 30.0}},
- &StatsInstance{cfg: &engine.StatsQueue{ID: "SECOND", Weight: 40.0}},
- &StatsInstance{cfg: &engine.StatsQueue{ID: "THIRD", Weight: 30.0}},
- &StatsInstance{cfg: &engine.StatsQueue{ID: "FOURTH", Weight: 35.0}},
+ &StatsInstance{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}},
+ &StatsInstance{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}},
+ &StatsInstance{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}},
+ &StatsInstance{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}},
}
sInsts.Sort()
eSInst := StatsInstances{
- &StatsInstance{cfg: &engine.StatsQueue{ID: "SECOND", Weight: 40.0}},
- &StatsInstance{cfg: &engine.StatsQueue{ID: "FOURTH", Weight: 35.0}},
- &StatsInstance{cfg: &engine.StatsQueue{ID: "FIRST", Weight: 30.0}},
- &StatsInstance{cfg: &engine.StatsQueue{ID: "THIRD", Weight: 30.0}},
+ &StatsInstance{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}},
+ &StatsInstance{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}},
+ &StatsInstance{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}},
+ &StatsInstance{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}},
}
if !reflect.DeepEqual(eSInst, sInsts) {
t.Errorf("expecting: %+v, received: %+v", eSInst, sInsts)
diff --git a/stats/service.go b/stats/service.go
index 29f9ecebd..e119891e2 100755
--- a/stats/service.go
+++ b/stats/service.go
@@ -39,14 +39,14 @@ func init() {
func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval time.Duration) (ss *StatService, err error) {
ss = &StatService{dataDB: dataDB, ms: ms, storeInterval: storeInterval,
stopStoring: make(chan struct{}), evCache: NewStatsEventCache()}
- sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatsQueuePrefix)
+ sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatsConfigPrefix)
if err != nil {
return nil, err
}
ss.queuesCache = make(map[string]*StatsInstance)
ss.queues = make(StatsInstances, 0)
for _, prfx := range sqPrfxs {
- if q, err := ss.loadQueue(prfx[len(utils.StatsQueuePrefix):]); err != nil {
+ if q, err := ss.loadQueue(prfx[len(utils.StatsConfigPrefix):]); err != nil {
utils.Logger.Err(fmt.Sprintf(" failed loading quueue with id: <%s>, err: <%s>",
q.cfg.ID, err.Error()))
continue
@@ -92,7 +92,7 @@ func (ss *StatService) Shutdown() error {
// setQueue adds or modifies a queue into cache
// sort will reorder the ss.queues
func (ss *StatService) loadQueue(qID string) (q *StatsInstance, err error) {
- sq, err := ss.dataDB.GetStatsQueue(qID)
+ sq, err := ss.dataDB.GetStatsConfig(qID)
if err != nil {
return nil, err
}
@@ -225,13 +225,13 @@ type ArgsLoadQueues struct {
func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err error) {
qIDs := args.QueueIDs
if qIDs == nil {
- sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsQueuePrefix)
+ sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsConfigPrefix)
if err != nil {
return err
}
queueIDs := make([]string, len(sqPrfxs))
for i, prfx := range sqPrfxs {
- queueIDs[i] = prfx[len(utils.StatsQueuePrefix):]
+ queueIDs[i] = prfx[len(utils.StatsConfigPrefix):]
}
if len(queueIDs) != 0 {
qIDs = &queueIDs
diff --git a/stats/service_test.go b/stats/service_test.go
index 940f752a9..b0b580017 100644
--- a/stats/service_test.go
+++ b/stats/service_test.go
@@ -32,8 +32,8 @@ func TestReqFilterPassStatS(t *testing.T) {
config.SetCgrConfig(cgrCfg)
}
dataStorage, _ := engine.NewMapStorage()
- dataStorage.SetStatsQueue(
- &engine.StatsQueue{ID: "CDRST1",
+ dataStorage.SetStatsConfig(
+ &engine.StatsConfig{ID: "CDRST1",
Filters: []*engine.RequestFilter{
&engine.RequestFilter{Type: engine.MetaString, FieldName: "Tenant",
Values: []string{"cgrates.org"}}},
diff --git a/utils/consts.go b/utils/consts.go
index 451bdbf25..d2a6e03a8 100755
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -251,7 +251,7 @@ const (
LOG_CDR = "cdr_"
LOG_MEDIATED_CDR = "mcd_"
SQStoredMetricsPrefix = "ssm_"
- StatsQueuePrefix = "stq_"
+ StatsConfigPrefix = "scf_"
ThresholdCfgPrefix = "thc_"
LOADINST_KEY = "load_history"
SESSION_MANAGER_SOURCE = "SMR"