Local tests for CDRStatsV1

This commit is contained in:
DanB
2014-08-02 17:19:20 +02:00
parent bcf4c3d08e
commit f2efe6fe9a
7 changed files with 234 additions and 50 deletions

View File

@@ -36,14 +36,15 @@ const (
)
type ApierV1 struct {
StorDb engine.LoadStorage
RatingDb engine.RatingStorage
AccountDb engine.AccountingStorage
CdrDb engine.CdrStorage
LogDb engine.LogStorage
Sched *scheduler.Scheduler
Config *config.CGRConfig
Responder *engine.Responder
StorDb engine.LoadStorage
RatingDb engine.RatingStorage
AccountDb engine.AccountingStorage
CdrDb engine.CdrStorage
LogDb engine.LogStorage
Sched *scheduler.Scheduler
Config *config.CGRConfig
Responder *engine.Responder
CdrStatsSrv *engine.Stats
}
func (self *ApierV1) GetDestination(dstId string, reply *engine.Destination) error {
@@ -786,6 +787,12 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
self.Sched.LoadActionTimings(self.AccountDb)
self.Sched.Restart()
}
cstKeys, _ := loader.GetLoadedIds(engine.CDR_STATS_PREFIX)
if len(cstKeys) != 0 && self.CdrStatsSrv != nil {
if err := self.CdrStatsSrv.ReloadQueues(cstKeys, nil); err != nil {
return err
}
}
*reply = "OK"
return nil
}

View File

@@ -31,7 +31,6 @@ import (
"path"
"reflect"
"sort"
//"strings"
"testing"
"time"
@@ -1261,7 +1260,7 @@ func TestApierLoadTariffPlanFromFolder(t *testing.T) {
} else if reply != "OK" {
t.Error("Calling ApierV1.LoadTariffPlanFromFolder got reply: ", reply)
}
time.Sleep(100 * time.Millisecond) // Give time for scheduler to execute topups
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
}
func TestResetDataAfterLoadFromFolder(t *testing.T) {

View File

@@ -18,31 +18,204 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package apier
/*
import (
"encoding/json"
"flag"
"fmt"
"net/http"
"net/rpc"
"net/rpc/jsonrpc"
"net/url"
"os"
"os/exec"
"path"
"reflect"
"sort"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"net/http"
"net/rpc"
"net/rpc/jsonrpc"
"os/exec"
"path"
"reflect"
"testing"
"time"
)
var cdrstCfgPath string
var cdrstCfg *config.CGRConfig
var cdrstRpc *rpc.Client
func init() {
cfgPath = path.Join(*dataDir, "conf", "samples", "apier_local_test.cfg")
cfg, _ = config.NewCGRConfigFromFile(&cfgPath)
cdrstCfgPath = path.Join(*dataDir, "conf", "samples", "cdrstatsv1_local_test.cfg")
cdrstCfg, _ = config.NewCGRConfigFromFile(&cfgPath)
}
func TestCDRStatsLclInitDataDb(t *testing.T) {
if !*testLocal {
return
}
ratingDb, err := engine.ConfigureRatingStorage(cdrstCfg.RatingDBType, cdrstCfg.RatingDBHost, cdrstCfg.RatingDBPort, cdrstCfg.RatingDBName,
cdrstCfg.RatingDBUser, cdrstCfg.RatingDBPass, cdrstCfg.DBDataEncoding)
if err != nil {
t.Fatal("Cannot connect to dataDb", err)
}
accountDb, err := engine.ConfigureAccountingStorage(cdrstCfg.AccountDBType, cdrstCfg.AccountDBHost, cdrstCfg.AccountDBPort, cdrstCfg.AccountDBName,
cdrstCfg.AccountDBUser, cdrstCfg.AccountDBPass, cdrstCfg.DBDataEncoding)
if err != nil {
t.Fatal("Cannot connect to dataDb", err)
}
for _, db := range []engine.Storage{ratingDb, accountDb} {
if err := db.Flush(); err != nil {
t.Fatal("Cannot reset dataDb", err)
}
}
}
func TestCDRStatsLclStartEngine(t *testing.T) {
if !*testLocal {
return
}
enginePath, err := exec.LookPath("cgr-engine")
if err != nil {
t.Fatal("Cannot find cgr-engine executable")
}
exec.Command("pkill", "cgr-engine").Run() // Just to make sure another one is not running, bit brutal maybe we can fine tune it
engine := exec.Command(enginePath, "-config", cdrstCfgPath)
if err := engine.Start(); err != nil {
t.Fatal("Cannot start cgr-engine: ", err.Error())
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time to rater to fire up
}
// Connect rpc client to rater
func TestCDRStatsLclRpcConn(t *testing.T) {
if !*testLocal {
return
}
var err error
cdrstRpc, err = jsonrpc.Dial("tcp", cdrstCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
}
func TestCDRStatsLclGetQueueIds(t *testing.T) {
if !*testLocal {
return
}
var queueIds []string
eQueueIds := []string{"*default"}
if err := cdrstRpc.Call("CDRStatsV1.GetQueueIds", "", &queueIds); err != nil {
t.Error("Calling CDRStatsV1.GetQueueIds, got error: ", err.Error())
} else if !reflect.DeepEqual(eQueueIds, queueIds) {
t.Errorf("Expecting: %v, received: %v", eQueueIds, queueIds)
}
}
func TestCDRStatsLclLoadTariffPlanFromFolder(t *testing.T) {
if !*testLocal {
return
}
reply := ""
// Simple test that command is executed without errors
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "cdrstats")}
if err := cdrstRpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil {
t.Error("Got error on ApierV1.LoadTariffPlanFromFolder: ", err.Error())
} else if reply != "OK" {
t.Error("Calling ApierV1.LoadTariffPlanFromFolder got reply: ", reply)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups
}
func TestCDRStatsLclGetQueueIds2(t *testing.T) {
if !*testLocal {
return
}
var queueIds []string
eQueueIds := []string{"*default", "CDRST3", "CDRST4"}
if err := cdrstRpc.Call("CDRStatsV1.GetQueueIds", "", &queueIds); err != nil {
t.Error("Calling CDRStatsV1.GetQueueIds, got error: ", err.Error())
} else if !reflect.DeepEqual(eQueueIds, queueIds) {
t.Errorf("Expecting: %v, received: %v", eQueueIds, queueIds)
}
}
func TestCDRStatsLclPostCdrs(t *testing.T) {
if !*testLocal {
return
}
httpClient := new(http.Client)
storedCdrs := []*utils.StoredCdr{
&utils.StoredCdr{CgrId: utils.Sha1("dsafdsafa", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "test",
ReqType: "rated", Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), MediationRunId: utils.DEFAULT_RUNID,
Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
Cost: 1.01, RatedAccount: "dan", RatedSubject: "dan",
},
&utils.StoredCdr{CgrId: utils.Sha1("dsafdsafb", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "test",
ReqType: "rated", Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), MediationRunId: utils.DEFAULT_RUNID,
Usage: time.Duration(5) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dan",
},
&utils.StoredCdr{CgrId: utils.Sha1("dsafdsafc", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "test",
ReqType: "rated", Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
MediationRunId: utils.DEFAULT_RUNID,
Usage: time.Duration(30) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dan",
},
&utils.StoredCdr{CgrId: utils.Sha1("dsafdsafd", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "test",
ReqType: "rated", Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Time{},
MediationRunId: utils.DEFAULT_RUNID,
Usage: time.Duration(0) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dan",
},
}
for _, storedCdr := range storedCdrs {
if _, err := httpClient.PostForm(fmt.Sprintf("http://%s/cgr", "127.0.0.1:2080"), storedCdr.AsHttpForm()); err != nil {
t.Error(err.Error())
}
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
}
func TestCDRStatsLclGetMetrics1(t *testing.T) {
if !*testLocal {
return
}
var rcvMetrics1 map[string]float64
expectedMetrics1 := map[string]float64{"ASR": 75, "ACD": 15, "ACC": 15}
if err := cdrstRpc.Call("CDRStatsV1.GetMetrics", AttrGetMetrics{StatsQueueId: "*default"}, &rcvMetrics1); err != nil {
t.Error("Calling CDRStatsV1.GetMetrics, got error: ", err.Error())
} else if !reflect.DeepEqual(expectedMetrics1, rcvMetrics1) {
t.Errorf("Expecting: %v, received: %v", expectedMetrics1, rcvMetrics1)
}
var rcvMetrics2 map[string]float64
expectedMetrics2 := map[string]float64{"ASR": 75, "ACD": 15}
if err := cdrstRpc.Call("CDRStatsV1.GetMetrics", AttrGetMetrics{StatsQueueId: "CDRST4"}, &rcvMetrics2); err != nil {
t.Error("Calling CDRStatsV1.GetMetrics, got error: ", err.Error())
} else if !reflect.DeepEqual(expectedMetrics2, rcvMetrics2) {
t.Errorf("Expecting: %v, received: %v", expectedMetrics2, rcvMetrics2)
}
}
func TestCDRStatsLclResetMetrics(t *testing.T) {
if !*testLocal {
return
}
var reply string
if err := cdrstRpc.Call("CDRStatsV1.ResetQueues", AttrReloadQueues{StatsQueueIds: []string{"CDRST4"}}, &reply); err != nil {
t.Error("Calling CDRStatsV1.ResetQueues, got error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
var rcvMetrics1 map[string]float64
expectedMetrics1 := map[string]float64{"ASR": 75, "ACD": 15, "ACC": 15}
if err := cdrstRpc.Call("CDRStatsV1.GetMetrics", AttrGetMetrics{StatsQueueId: "*default"}, &rcvMetrics1); err != nil {
t.Error("Calling CDRStatsV1.GetMetrics, got error: ", err.Error())
} else if !reflect.DeepEqual(expectedMetrics1, rcvMetrics1) {
t.Errorf("Expecting: %v, received: %v", expectedMetrics1, rcvMetrics1)
}
var rcvMetrics2 map[string]float64
expectedMetrics2 := map[string]float64{"ASR": 0, "ACD": 0}
if err := cdrstRpc.Call("CDRStatsV1.GetMetrics", AttrGetMetrics{StatsQueueId: "CDRST4"}, &rcvMetrics2); err != nil {
t.Error("Calling CDRStatsV1.GetMetrics, got error: ", err.Error())
} else if !reflect.DeepEqual(expectedMetrics2, rcvMetrics2) {
t.Errorf("Expecting: %v, received: %v", expectedMetrics2, rcvMetrics2)
}
}
*/

View File

@@ -406,8 +406,17 @@ func main() {
stopHandled = true
}
if cfg.CDRStatsEnabled { // Init it here so we make it availabe to the Apier
cdrStats = engine.NewStats(ratingDb)
if cfg.CDRStatConfig != nil && len(cfg.CDRStatConfig.Metrics) != 0 {
cdrStats.AddQueue(engine.NewCdrStatsFromCdrStatsCfg(cfg.CDRStatConfig), nil)
}
server.RpcRegister(cdrStats)
server.RpcRegister(&apier.CDRStatsV1{cdrStats}) // Public APIs
}
responder := &engine.Responder{ExitChan: exitChan}
apierRpc := &apier.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder}
apierRpc := &apier.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats}
if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterBalancer != utils.INTERNAL {
engine.Logger.Info("Registering Rater service")
@@ -462,16 +471,6 @@ func main() {
go startMediator(responder, logDb, cdrDb, cacheChan, medChan)
}
if cfg.CDRStatsEnabled {
cdrStats = engine.NewStats(ratingDb)
if cfg.CDRStatConfig != nil && len(cfg.CDRStatConfig.Metrics) != 0 {
var out int
cdrStats.AddQueue(engine.NewCdrStatsFromCdrStatsCfg(cfg.CDRStatConfig), &out)
}
server.RpcRegister(cdrStats)
server.RpcRegister(&apier.CDRStatsV1{cdrStats}) // Public APIs
}
var cdrsChan chan struct{}
if cfg.CDRSEnabled {
engine.Logger.Info("Starting CGRateS CDRS service.")

View File

@@ -4,17 +4,23 @@
# This file contains the default configuration hardcoded into CGRateS.
# This is what you get when you load CGRateS with an empty configuration file.
[global]
rpc_json_listen =:2012
[rater]
enabled = true # Enable RaterCDRSExportPath service: <true|false>.
enabled = true
[cdrs]
enabled = true # Start the CDR Server service: <true|false>.
mediator = internal # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
enabled = true
mediator = internal
store_disable = true
[mediator]
enabled = true # Starts Mediator service: <true|false>.
enabled = true
store_disable = true
[cdrstats]
enabled = true # Starts the cdrstats service: <true|false>
enabled = true
queue_length = 5 # Maximum number of items in the stats buffer
time_window = 0 # Queue is not affected by the SetupTime

View File

@@ -1,14 +1,14 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2013 ITsysCOM
Real-time Charging System for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
This program is free software: you can Storagetribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
but WITH*out ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

View File

@@ -206,5 +206,5 @@ func (ps *ProxyStats) ReloadQueues(ids []string, out *int) error {
}
func (ps *ProxyStats) ResetQueues(ids []string, out *int) error {
return ps.Client.Call("Stats.ReserQueues", ids, out)
return ps.Client.Call("Stats.ResetQueues", ids, out)
}