From f2efe6fe9ad625951fde5d53d90d3149bc889767 Mon Sep 17 00:00:00 2001 From: DanB Date: Sat, 2 Aug 2014 17:19:20 +0200 Subject: [PATCH] Local tests for CDRStatsV1 --- apier/apier.go | 23 ++- apier/apier_local_test.go | 3 +- apier/cdrstatsv1_local_test.go | 211 ++++++++++++++++++-- cmd/cgr-engine/cgr-engine.go | 21 +- data/conf/samples/cdrstatsv1_local_test.cfg | 16 +- engine/loader_csv.go | 8 +- engine/stats.go | 2 +- 7 files changed, 234 insertions(+), 50 deletions(-) diff --git a/apier/apier.go b/apier/apier.go index 82451c5cc..0b7a3666e 100644 --- a/apier/apier.go +++ b/apier/apier.go @@ -36,14 +36,15 @@ const ( ) type ApierV1 struct { - StorDb engine.LoadStorage - RatingDb engine.RatingStorage - AccountDb engine.AccountingStorage - CdrDb engine.CdrStorage - LogDb engine.LogStorage - Sched *scheduler.Scheduler - Config *config.CGRConfig - Responder *engine.Responder + StorDb engine.LoadStorage + RatingDb engine.RatingStorage + AccountDb engine.AccountingStorage + CdrDb engine.CdrStorage + LogDb engine.LogStorage + Sched *scheduler.Scheduler + Config *config.CGRConfig + Responder *engine.Responder + CdrStatsSrv *engine.Stats } func (self *ApierV1) GetDestination(dstId string, reply *engine.Destination) error { @@ -786,6 +787,12 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, self.Sched.LoadActionTimings(self.AccountDb) self.Sched.Restart() } + cstKeys, _ := loader.GetLoadedIds(engine.CDR_STATS_PREFIX) + if len(cstKeys) != 0 && self.CdrStatsSrv != nil { + if err := self.CdrStatsSrv.ReloadQueues(cstKeys, nil); err != nil { + return err + } + } *reply = "OK" return nil } diff --git a/apier/apier_local_test.go b/apier/apier_local_test.go index 136e51b35..720e18ac9 100644 --- a/apier/apier_local_test.go +++ b/apier/apier_local_test.go @@ -31,7 +31,6 @@ import ( "path" "reflect" "sort" - //"strings" "testing" "time" @@ -1261,7 +1260,7 @@ func TestApierLoadTariffPlanFromFolder(t *testing.T) { } else if reply != "OK" { t.Error("Calling ApierV1.LoadTariffPlanFromFolder got reply: ", reply) } - time.Sleep(100 * time.Millisecond) // Give time for scheduler to execute topups + time.Sleep(time.Duration(*waitRater) * time.Millisecond) } func TestResetDataAfterLoadFromFolder(t *testing.T) { diff --git a/apier/cdrstatsv1_local_test.go b/apier/cdrstatsv1_local_test.go index 723f8e243..7cfd25bd8 100644 --- a/apier/cdrstatsv1_local_test.go +++ b/apier/cdrstatsv1_local_test.go @@ -18,31 +18,204 @@ along with this program. If not, see package apier -/* - import ( - "encoding/json" - "flag" "fmt" - "net/http" - "net/rpc" - "net/rpc/jsonrpc" - "net/url" - "os" - "os/exec" - "path" - "reflect" - "sort" - "testing" - "time" - "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "net/http" + "net/rpc" + "net/rpc/jsonrpc" + "os/exec" + "path" + "reflect" + "testing" + "time" ) +var cdrstCfgPath string +var cdrstCfg *config.CGRConfig +var cdrstRpc *rpc.Client + func init() { - cfgPath = path.Join(*dataDir, "conf", "samples", "apier_local_test.cfg") - cfg, _ = config.NewCGRConfigFromFile(&cfgPath) + cdrstCfgPath = path.Join(*dataDir, "conf", "samples", "cdrstatsv1_local_test.cfg") + cdrstCfg, _ = config.NewCGRConfigFromFile(&cfgPath) +} + +func TestCDRStatsLclInitDataDb(t *testing.T) { + if !*testLocal { + return + } + ratingDb, err := engine.ConfigureRatingStorage(cdrstCfg.RatingDBType, cdrstCfg.RatingDBHost, cdrstCfg.RatingDBPort, cdrstCfg.RatingDBName, + cdrstCfg.RatingDBUser, cdrstCfg.RatingDBPass, cdrstCfg.DBDataEncoding) + if err != nil { + t.Fatal("Cannot connect to dataDb", err) + } + accountDb, err := engine.ConfigureAccountingStorage(cdrstCfg.AccountDBType, cdrstCfg.AccountDBHost, cdrstCfg.AccountDBPort, cdrstCfg.AccountDBName, + cdrstCfg.AccountDBUser, cdrstCfg.AccountDBPass, cdrstCfg.DBDataEncoding) + if err != nil { + t.Fatal("Cannot connect to dataDb", err) + } + for _, db := range []engine.Storage{ratingDb, accountDb} { + if err := db.Flush(); err != nil { + t.Fatal("Cannot reset dataDb", err) + } + } +} + +func TestCDRStatsLclStartEngine(t *testing.T) { + if !*testLocal { + return + } + enginePath, err := exec.LookPath("cgr-engine") + if err != nil { + t.Fatal("Cannot find cgr-engine executable") + } + exec.Command("pkill", "cgr-engine").Run() // Just to make sure another one is not running, bit brutal maybe we can fine tune it + engine := exec.Command(enginePath, "-config", cdrstCfgPath) + if err := engine.Start(); err != nil { + t.Fatal("Cannot start cgr-engine: ", err.Error()) + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time to rater to fire up +} + +// Connect rpc client to rater +func TestCDRStatsLclRpcConn(t *testing.T) { + if !*testLocal { + return + } + var err error + cdrstRpc, err = jsonrpc.Dial("tcp", cdrstCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +func TestCDRStatsLclGetQueueIds(t *testing.T) { + if !*testLocal { + return + } + var queueIds []string + eQueueIds := []string{"*default"} + if err := cdrstRpc.Call("CDRStatsV1.GetQueueIds", "", &queueIds); err != nil { + t.Error("Calling CDRStatsV1.GetQueueIds, got error: ", err.Error()) + } else if !reflect.DeepEqual(eQueueIds, queueIds) { + t.Errorf("Expecting: %v, received: %v", eQueueIds, queueIds) + } +} + +func TestCDRStatsLclLoadTariffPlanFromFolder(t *testing.T) { + if !*testLocal { + return + } + reply := "" + // Simple test that command is executed without errors + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "cdrstats")} + if err := cdrstRpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil { + t.Error("Got error on ApierV1.LoadTariffPlanFromFolder: ", err.Error()) + } else if reply != "OK" { + t.Error("Calling ApierV1.LoadTariffPlanFromFolder got reply: ", reply) + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups +} + +func TestCDRStatsLclGetQueueIds2(t *testing.T) { + if !*testLocal { + return + } + var queueIds []string + eQueueIds := []string{"*default", "CDRST3", "CDRST4"} + if err := cdrstRpc.Call("CDRStatsV1.GetQueueIds", "", &queueIds); err != nil { + t.Error("Calling CDRStatsV1.GetQueueIds, got error: ", err.Error()) + } else if !reflect.DeepEqual(eQueueIds, queueIds) { + t.Errorf("Expecting: %v, received: %v", eQueueIds, queueIds) + } +} + +func TestCDRStatsLclPostCdrs(t *testing.T) { + if !*testLocal { + return + } + httpClient := new(http.Client) + storedCdrs := []*utils.StoredCdr{ + &utils.StoredCdr{CgrId: utils.Sha1("dsafdsafa", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "test", + ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", + Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), MediationRunId: utils.DEFAULT_RUNID, + Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, + Cost: 1.01, RatedAccount: "dan", RatedSubject: "dan", + }, + &utils.StoredCdr{CgrId: utils.Sha1("dsafdsafb", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "test", + ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", + Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), MediationRunId: utils.DEFAULT_RUNID, + Usage: time.Duration(5) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dan", + }, + &utils.StoredCdr{CgrId: utils.Sha1("dsafdsafc", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "test", + ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", + Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + MediationRunId: utils.DEFAULT_RUNID, + Usage: time.Duration(30) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dan", + }, + &utils.StoredCdr{CgrId: utils.Sha1("dsafdsafd", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "test", + ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", + Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Time{}, + MediationRunId: utils.DEFAULT_RUNID, + Usage: time.Duration(0) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dan", + }, + } + for _, storedCdr := range storedCdrs { + if _, err := httpClient.PostForm(fmt.Sprintf("http://%s/cgr", "127.0.0.1:2080"), storedCdr.AsHttpForm()); err != nil { + t.Error(err.Error()) + } + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) + +} + +func TestCDRStatsLclGetMetrics1(t *testing.T) { + if !*testLocal { + return + } + var rcvMetrics1 map[string]float64 + expectedMetrics1 := map[string]float64{"ASR": 75, "ACD": 15, "ACC": 15} + if err := cdrstRpc.Call("CDRStatsV1.GetMetrics", AttrGetMetrics{StatsQueueId: "*default"}, &rcvMetrics1); err != nil { + t.Error("Calling CDRStatsV1.GetMetrics, got error: ", err.Error()) + } else if !reflect.DeepEqual(expectedMetrics1, rcvMetrics1) { + t.Errorf("Expecting: %v, received: %v", expectedMetrics1, rcvMetrics1) + } + var rcvMetrics2 map[string]float64 + expectedMetrics2 := map[string]float64{"ASR": 75, "ACD": 15} + if err := cdrstRpc.Call("CDRStatsV1.GetMetrics", AttrGetMetrics{StatsQueueId: "CDRST4"}, &rcvMetrics2); err != nil { + t.Error("Calling CDRStatsV1.GetMetrics, got error: ", err.Error()) + } else if !reflect.DeepEqual(expectedMetrics2, rcvMetrics2) { + t.Errorf("Expecting: %v, received: %v", expectedMetrics2, rcvMetrics2) + } +} + +func TestCDRStatsLclResetMetrics(t *testing.T) { + if !*testLocal { + return + } + var reply string + if err := cdrstRpc.Call("CDRStatsV1.ResetQueues", AttrReloadQueues{StatsQueueIds: []string{"CDRST4"}}, &reply); err != nil { + t.Error("Calling CDRStatsV1.ResetQueues, got error: ", err.Error()) + } else if reply != utils.OK { + t.Error("Unexpected reply received: ", reply) + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) + var rcvMetrics1 map[string]float64 + expectedMetrics1 := map[string]float64{"ASR": 75, "ACD": 15, "ACC": 15} + if err := cdrstRpc.Call("CDRStatsV1.GetMetrics", AttrGetMetrics{StatsQueueId: "*default"}, &rcvMetrics1); err != nil { + t.Error("Calling CDRStatsV1.GetMetrics, got error: ", err.Error()) + } else if !reflect.DeepEqual(expectedMetrics1, rcvMetrics1) { + t.Errorf("Expecting: %v, received: %v", expectedMetrics1, rcvMetrics1) + } + var rcvMetrics2 map[string]float64 + expectedMetrics2 := map[string]float64{"ASR": 0, "ACD": 0} + if err := cdrstRpc.Call("CDRStatsV1.GetMetrics", AttrGetMetrics{StatsQueueId: "CDRST4"}, &rcvMetrics2); err != nil { + t.Error("Calling CDRStatsV1.GetMetrics, got error: ", err.Error()) + } else if !reflect.DeepEqual(expectedMetrics2, rcvMetrics2) { + t.Errorf("Expecting: %v, received: %v", expectedMetrics2, rcvMetrics2) + } } -*/ diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 64bb155dd..85002e719 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -406,8 +406,17 @@ func main() { stopHandled = true } + if cfg.CDRStatsEnabled { // Init it here so we make it availabe to the Apier + cdrStats = engine.NewStats(ratingDb) + if cfg.CDRStatConfig != nil && len(cfg.CDRStatConfig.Metrics) != 0 { + cdrStats.AddQueue(engine.NewCdrStatsFromCdrStatsCfg(cfg.CDRStatConfig), nil) + } + server.RpcRegister(cdrStats) + server.RpcRegister(&apier.CDRStatsV1{cdrStats}) // Public APIs + } + responder := &engine.Responder{ExitChan: exitChan} - apierRpc := &apier.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder} + apierRpc := &apier.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats} if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterBalancer != utils.INTERNAL { engine.Logger.Info("Registering Rater service") @@ -462,16 +471,6 @@ func main() { go startMediator(responder, logDb, cdrDb, cacheChan, medChan) } - if cfg.CDRStatsEnabled { - cdrStats = engine.NewStats(ratingDb) - if cfg.CDRStatConfig != nil && len(cfg.CDRStatConfig.Metrics) != 0 { - var out int - cdrStats.AddQueue(engine.NewCdrStatsFromCdrStatsCfg(cfg.CDRStatConfig), &out) - } - server.RpcRegister(cdrStats) - server.RpcRegister(&apier.CDRStatsV1{cdrStats}) // Public APIs - } - var cdrsChan chan struct{} if cfg.CDRSEnabled { engine.Logger.Info("Starting CGRateS CDRS service.") diff --git a/data/conf/samples/cdrstatsv1_local_test.cfg b/data/conf/samples/cdrstatsv1_local_test.cfg index 415eee300..040b13ae3 100644 --- a/data/conf/samples/cdrstatsv1_local_test.cfg +++ b/data/conf/samples/cdrstatsv1_local_test.cfg @@ -4,17 +4,23 @@ # This file contains the default configuration hardcoded into CGRateS. # This is what you get when you load CGRateS with an empty configuration file. +[global] +rpc_json_listen =:2012 + + [rater] -enabled = true # Enable RaterCDRSExportPath service: . +enabled = true [cdrs] -enabled = true # Start the CDR Server service: . -mediator = internal # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> +enabled = true +mediator = internal +store_disable = true [mediator] -enabled = true # Starts Mediator service: . +enabled = true +store_disable = true [cdrstats] -enabled = true # Starts the cdrstats service: +enabled = true queue_length = 5 # Maximum number of items in the stats buffer time_window = 0 # Queue is not affected by the SetupTime diff --git a/engine/loader_csv.go b/engine/loader_csv.go index 5913c85ab..b0acdaf84 100644 --- a/engine/loader_csv.go +++ b/engine/loader_csv.go @@ -1,14 +1,14 @@ /* -Rating system designed to be used in VoIP Carriers World -Copyright (C) 2013 ITsysCOM +Real-time Charging System for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH -This program is free software: you can redistribute it and/or modify +This program is free software: you can Storagetribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of +but WITH*out ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. diff --git a/engine/stats.go b/engine/stats.go index 3678f29d8..4ab935cf3 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -206,5 +206,5 @@ func (ps *ProxyStats) ReloadQueues(ids []string, out *int) error { } func (ps *ProxyStats) ResetQueues(ids []string, out *int) error { - return ps.Client.Call("Stats.ReserQueues", ids, out) + return ps.Client.Call("Stats.ResetQueues", ids, out) }