diff --git a/apier/apier.go b/apier/apier.go
index 82451c5cc..0b7a3666e 100644
--- a/apier/apier.go
+++ b/apier/apier.go
@@ -36,14 +36,15 @@ const (
)
type ApierV1 struct {
- StorDb engine.LoadStorage
- RatingDb engine.RatingStorage
- AccountDb engine.AccountingStorage
- CdrDb engine.CdrStorage
- LogDb engine.LogStorage
- Sched *scheduler.Scheduler
- Config *config.CGRConfig
- Responder *engine.Responder
+ StorDb engine.LoadStorage
+ RatingDb engine.RatingStorage
+ AccountDb engine.AccountingStorage
+ CdrDb engine.CdrStorage
+ LogDb engine.LogStorage
+ Sched *scheduler.Scheduler
+ Config *config.CGRConfig
+ Responder *engine.Responder
+ CdrStatsSrv *engine.Stats
}
func (self *ApierV1) GetDestination(dstId string, reply *engine.Destination) error {
@@ -786,6 +787,12 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
self.Sched.LoadActionTimings(self.AccountDb)
self.Sched.Restart()
}
+ cstKeys, _ := loader.GetLoadedIds(engine.CDR_STATS_PREFIX)
+ if len(cstKeys) != 0 && self.CdrStatsSrv != nil {
+ if err := self.CdrStatsSrv.ReloadQueues(cstKeys, nil); err != nil {
+ return err
+ }
+ }
*reply = "OK"
return nil
}
diff --git a/apier/apier_local_test.go b/apier/apier_local_test.go
index 136e51b35..720e18ac9 100644
--- a/apier/apier_local_test.go
+++ b/apier/apier_local_test.go
@@ -31,7 +31,6 @@ import (
"path"
"reflect"
"sort"
- //"strings"
"testing"
"time"
@@ -1261,7 +1260,7 @@ func TestApierLoadTariffPlanFromFolder(t *testing.T) {
} else if reply != "OK" {
t.Error("Calling ApierV1.LoadTariffPlanFromFolder got reply: ", reply)
}
- time.Sleep(100 * time.Millisecond) // Give time for scheduler to execute topups
+ time.Sleep(time.Duration(*waitRater) * time.Millisecond)
}
func TestResetDataAfterLoadFromFolder(t *testing.T) {
diff --git a/apier/cdrstatsv1_local_test.go b/apier/cdrstatsv1_local_test.go
index 723f8e243..7cfd25bd8 100644
--- a/apier/cdrstatsv1_local_test.go
+++ b/apier/cdrstatsv1_local_test.go
@@ -18,31 +18,204 @@ along with this program. If not, see
package apier
-/*
-
import (
- "encoding/json"
- "flag"
"fmt"
- "net/http"
- "net/rpc"
- "net/rpc/jsonrpc"
- "net/url"
- "os"
- "os/exec"
- "path"
- "reflect"
- "sort"
- "testing"
- "time"
-
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
+ "net/http"
+ "net/rpc"
+ "net/rpc/jsonrpc"
+ "os/exec"
+ "path"
+ "reflect"
+ "testing"
+ "time"
)
+var cdrstCfgPath string
+var cdrstCfg *config.CGRConfig
+var cdrstRpc *rpc.Client
+
func init() {
- cfgPath = path.Join(*dataDir, "conf", "samples", "apier_local_test.cfg")
- cfg, _ = config.NewCGRConfigFromFile(&cfgPath)
+ cdrstCfgPath = path.Join(*dataDir, "conf", "samples", "cdrstatsv1_local_test.cfg")
+ cdrstCfg, _ = config.NewCGRConfigFromFile(&cfgPath)
+}
+
+func TestCDRStatsLclInitDataDb(t *testing.T) {
+ if !*testLocal {
+ return
+ }
+ ratingDb, err := engine.ConfigureRatingStorage(cdrstCfg.RatingDBType, cdrstCfg.RatingDBHost, cdrstCfg.RatingDBPort, cdrstCfg.RatingDBName,
+ cdrstCfg.RatingDBUser, cdrstCfg.RatingDBPass, cdrstCfg.DBDataEncoding)
+ if err != nil {
+ t.Fatal("Cannot connect to dataDb", err)
+ }
+ accountDb, err := engine.ConfigureAccountingStorage(cdrstCfg.AccountDBType, cdrstCfg.AccountDBHost, cdrstCfg.AccountDBPort, cdrstCfg.AccountDBName,
+ cdrstCfg.AccountDBUser, cdrstCfg.AccountDBPass, cdrstCfg.DBDataEncoding)
+ if err != nil {
+ t.Fatal("Cannot connect to dataDb", err)
+ }
+ for _, db := range []engine.Storage{ratingDb, accountDb} {
+ if err := db.Flush(); err != nil {
+ t.Fatal("Cannot reset dataDb", err)
+ }
+ }
+}
+
+func TestCDRStatsLclStartEngine(t *testing.T) {
+ if !*testLocal {
+ return
+ }
+ enginePath, err := exec.LookPath("cgr-engine")
+ if err != nil {
+ t.Fatal("Cannot find cgr-engine executable")
+ }
+ exec.Command("pkill", "cgr-engine").Run() // Just to make sure another one is not running, bit brutal maybe we can fine tune it
+ engine := exec.Command(enginePath, "-config", cdrstCfgPath)
+ if err := engine.Start(); err != nil {
+ t.Fatal("Cannot start cgr-engine: ", err.Error())
+ }
+ time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time to rater to fire up
+}
+
+// Connect rpc client to rater
+func TestCDRStatsLclRpcConn(t *testing.T) {
+ if !*testLocal {
+ return
+ }
+ var err error
+ cdrstRpc, err = jsonrpc.Dial("tcp", cdrstCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
+ if err != nil {
+ t.Fatal("Could not connect to rater: ", err.Error())
+ }
+}
+
+func TestCDRStatsLclGetQueueIds(t *testing.T) {
+ if !*testLocal {
+ return
+ }
+ var queueIds []string
+ eQueueIds := []string{"*default"}
+ if err := cdrstRpc.Call("CDRStatsV1.GetQueueIds", "", &queueIds); err != nil {
+ t.Error("Calling CDRStatsV1.GetQueueIds, got error: ", err.Error())
+ } else if !reflect.DeepEqual(eQueueIds, queueIds) {
+ t.Errorf("Expecting: %v, received: %v", eQueueIds, queueIds)
+ }
+}
+
+func TestCDRStatsLclLoadTariffPlanFromFolder(t *testing.T) {
+ if !*testLocal {
+ return
+ }
+ reply := ""
+ // Simple test that command is executed without errors
+ attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "cdrstats")}
+ if err := cdrstRpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil {
+ t.Error("Got error on ApierV1.LoadTariffPlanFromFolder: ", err.Error())
+ } else if reply != "OK" {
+ t.Error("Calling ApierV1.LoadTariffPlanFromFolder got reply: ", reply)
+ }
+ time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups
+}
+
+func TestCDRStatsLclGetQueueIds2(t *testing.T) {
+ if !*testLocal {
+ return
+ }
+ var queueIds []string
+ eQueueIds := []string{"*default", "CDRST3", "CDRST4"}
+ if err := cdrstRpc.Call("CDRStatsV1.GetQueueIds", "", &queueIds); err != nil {
+ t.Error("Calling CDRStatsV1.GetQueueIds, got error: ", err.Error())
+ } else if !reflect.DeepEqual(eQueueIds, queueIds) {
+ t.Errorf("Expecting: %v, received: %v", eQueueIds, queueIds)
+ }
+}
+
+func TestCDRStatsLclPostCdrs(t *testing.T) {
+ if !*testLocal {
+ return
+ }
+ httpClient := new(http.Client)
+ storedCdrs := []*utils.StoredCdr{
+ &utils.StoredCdr{CgrId: utils.Sha1("dsafdsafa", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "test",
+ ReqType: "rated", Direction: "*out", Tenant: "cgrates.org",
+ Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
+ AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), MediationRunId: utils.DEFAULT_RUNID,
+ Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
+ Cost: 1.01, RatedAccount: "dan", RatedSubject: "dan",
+ },
+ &utils.StoredCdr{CgrId: utils.Sha1("dsafdsafb", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "test",
+ ReqType: "rated", Direction: "*out", Tenant: "cgrates.org",
+ Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
+ AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), MediationRunId: utils.DEFAULT_RUNID,
+ Usage: time.Duration(5) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dan",
+ },
+ &utils.StoredCdr{CgrId: utils.Sha1("dsafdsafc", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "test",
+ ReqType: "rated", Direction: "*out", Tenant: "cgrates.org",
+ Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
+ MediationRunId: utils.DEFAULT_RUNID,
+ Usage: time.Duration(30) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dan",
+ },
+ &utils.StoredCdr{CgrId: utils.Sha1("dsafdsafd", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "test",
+ ReqType: "rated", Direction: "*out", Tenant: "cgrates.org",
+ Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Time{},
+ MediationRunId: utils.DEFAULT_RUNID,
+ Usage: time.Duration(0) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dan",
+ },
+ }
+ for _, storedCdr := range storedCdrs {
+ if _, err := httpClient.PostForm(fmt.Sprintf("http://%s/cgr", "127.0.0.1:2080"), storedCdr.AsHttpForm()); err != nil {
+ t.Error(err.Error())
+ }
+ }
+ time.Sleep(time.Duration(*waitRater) * time.Millisecond)
+
+}
+
+func TestCDRStatsLclGetMetrics1(t *testing.T) {
+ if !*testLocal {
+ return
+ }
+ var rcvMetrics1 map[string]float64
+ expectedMetrics1 := map[string]float64{"ASR": 75, "ACD": 15, "ACC": 15}
+ if err := cdrstRpc.Call("CDRStatsV1.GetMetrics", AttrGetMetrics{StatsQueueId: "*default"}, &rcvMetrics1); err != nil {
+ t.Error("Calling CDRStatsV1.GetMetrics, got error: ", err.Error())
+ } else if !reflect.DeepEqual(expectedMetrics1, rcvMetrics1) {
+ t.Errorf("Expecting: %v, received: %v", expectedMetrics1, rcvMetrics1)
+ }
+ var rcvMetrics2 map[string]float64
+ expectedMetrics2 := map[string]float64{"ASR": 75, "ACD": 15}
+ if err := cdrstRpc.Call("CDRStatsV1.GetMetrics", AttrGetMetrics{StatsQueueId: "CDRST4"}, &rcvMetrics2); err != nil {
+ t.Error("Calling CDRStatsV1.GetMetrics, got error: ", err.Error())
+ } else if !reflect.DeepEqual(expectedMetrics2, rcvMetrics2) {
+ t.Errorf("Expecting: %v, received: %v", expectedMetrics2, rcvMetrics2)
+ }
+}
+
+func TestCDRStatsLclResetMetrics(t *testing.T) {
+ if !*testLocal {
+ return
+ }
+ var reply string
+ if err := cdrstRpc.Call("CDRStatsV1.ResetQueues", AttrReloadQueues{StatsQueueIds: []string{"CDRST4"}}, &reply); err != nil {
+ t.Error("Calling CDRStatsV1.ResetQueues, got error: ", err.Error())
+ } else if reply != utils.OK {
+ t.Error("Unexpected reply received: ", reply)
+ }
+ time.Sleep(time.Duration(*waitRater) * time.Millisecond)
+ var rcvMetrics1 map[string]float64
+ expectedMetrics1 := map[string]float64{"ASR": 75, "ACD": 15, "ACC": 15}
+ if err := cdrstRpc.Call("CDRStatsV1.GetMetrics", AttrGetMetrics{StatsQueueId: "*default"}, &rcvMetrics1); err != nil {
+ t.Error("Calling CDRStatsV1.GetMetrics, got error: ", err.Error())
+ } else if !reflect.DeepEqual(expectedMetrics1, rcvMetrics1) {
+ t.Errorf("Expecting: %v, received: %v", expectedMetrics1, rcvMetrics1)
+ }
+ var rcvMetrics2 map[string]float64
+ expectedMetrics2 := map[string]float64{"ASR": 0, "ACD": 0}
+ if err := cdrstRpc.Call("CDRStatsV1.GetMetrics", AttrGetMetrics{StatsQueueId: "CDRST4"}, &rcvMetrics2); err != nil {
+ t.Error("Calling CDRStatsV1.GetMetrics, got error: ", err.Error())
+ } else if !reflect.DeepEqual(expectedMetrics2, rcvMetrics2) {
+ t.Errorf("Expecting: %v, received: %v", expectedMetrics2, rcvMetrics2)
+ }
}
-*/
diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go
index 64bb155dd..85002e719 100644
--- a/cmd/cgr-engine/cgr-engine.go
+++ b/cmd/cgr-engine/cgr-engine.go
@@ -406,8 +406,17 @@ func main() {
stopHandled = true
}
+ if cfg.CDRStatsEnabled { // Init it here so we make it availabe to the Apier
+ cdrStats = engine.NewStats(ratingDb)
+ if cfg.CDRStatConfig != nil && len(cfg.CDRStatConfig.Metrics) != 0 {
+ cdrStats.AddQueue(engine.NewCdrStatsFromCdrStatsCfg(cfg.CDRStatConfig), nil)
+ }
+ server.RpcRegister(cdrStats)
+ server.RpcRegister(&apier.CDRStatsV1{cdrStats}) // Public APIs
+ }
+
responder := &engine.Responder{ExitChan: exitChan}
- apierRpc := &apier.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder}
+ apierRpc := &apier.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats}
if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterBalancer != utils.INTERNAL {
engine.Logger.Info("Registering Rater service")
@@ -462,16 +471,6 @@ func main() {
go startMediator(responder, logDb, cdrDb, cacheChan, medChan)
}
- if cfg.CDRStatsEnabled {
- cdrStats = engine.NewStats(ratingDb)
- if cfg.CDRStatConfig != nil && len(cfg.CDRStatConfig.Metrics) != 0 {
- var out int
- cdrStats.AddQueue(engine.NewCdrStatsFromCdrStatsCfg(cfg.CDRStatConfig), &out)
- }
- server.RpcRegister(cdrStats)
- server.RpcRegister(&apier.CDRStatsV1{cdrStats}) // Public APIs
- }
-
var cdrsChan chan struct{}
if cfg.CDRSEnabled {
engine.Logger.Info("Starting CGRateS CDRS service.")
diff --git a/data/conf/samples/cdrstatsv1_local_test.cfg b/data/conf/samples/cdrstatsv1_local_test.cfg
index 415eee300..040b13ae3 100644
--- a/data/conf/samples/cdrstatsv1_local_test.cfg
+++ b/data/conf/samples/cdrstatsv1_local_test.cfg
@@ -4,17 +4,23 @@
# This file contains the default configuration hardcoded into CGRateS.
# This is what you get when you load CGRateS with an empty configuration file.
+[global]
+rpc_json_listen =:2012
+
+
[rater]
-enabled = true # Enable RaterCDRSExportPath service: .
+enabled = true
[cdrs]
-enabled = true # Start the CDR Server service: .
-mediator = internal # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
+enabled = true
+mediator = internal
+store_disable = true
[mediator]
-enabled = true # Starts Mediator service: .
+enabled = true
+store_disable = true
[cdrstats]
-enabled = true # Starts the cdrstats service:
+enabled = true
queue_length = 5 # Maximum number of items in the stats buffer
time_window = 0 # Queue is not affected by the SetupTime
diff --git a/engine/loader_csv.go b/engine/loader_csv.go
index 5913c85ab..b0acdaf84 100644
--- a/engine/loader_csv.go
+++ b/engine/loader_csv.go
@@ -1,14 +1,14 @@
/*
-Rating system designed to be used in VoIP Carriers World
-Copyright (C) 2013 ITsysCOM
+Real-time Charging System for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
-This program is free software: you can redistribute it and/or modify
+This program is free software: you can Storagetribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
+but WITH*out ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
diff --git a/engine/stats.go b/engine/stats.go
index 3678f29d8..4ab935cf3 100644
--- a/engine/stats.go
+++ b/engine/stats.go
@@ -206,5 +206,5 @@ func (ps *ProxyStats) ReloadQueues(ids []string, out *int) error {
}
func (ps *ProxyStats) ResetQueues(ids []string, out *int) error {
- return ps.Client.Call("Stats.ReserQueues", ids, out)
+ return ps.Client.Call("Stats.ResetQueues", ids, out)
}