Adding local_test for mediator rpc method

This commit is contained in:
DanB
2014-02-10 18:31:27 +01:00
parent b91fc4ca21
commit 944262ccff
8 changed files with 204 additions and 29 deletions

View File

@@ -530,13 +530,9 @@ func (self *ApierV1) GetCachedItemAge(itemId string, reply *utils.CachedItemAge)
return nil
}
type AttrLoadTPFromFolder struct {
FolderPath string // Take files from folder absolute path
DryRun bool // Do not write to database but parse only
FlushDb bool // Flush previous data before loading new one
}
func (self *ApierV1) LoadTariffPlanFromFolder(attrs AttrLoadTPFromFolder, reply *string) error {
func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, reply *string) error {
loader := engine.NewFileCSVReader(self.RatingDb, self.AccountDb, utils.CSV_SEP,
path.Join(attrs.FolderPath, utils.DESTINATIONS_CSV),
path.Join(attrs.FolderPath, utils.TIMINGS_CSV),

View File

@@ -1181,7 +1181,7 @@ func TestApierLoadTariffPlanFromFolder(t *testing.T) {
}
reply := ""
// Simple test that command is executed without errors
attrs := &AttrLoadTPFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "prepaid1centpsec")}
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "prepaid1centpsec")}
if err := rater.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil {
t.Error("Got error on ApierV1.LoadTariffPlanFromFolder: ", err.Error())
} else if reply != "OK" {

View File

@@ -123,19 +123,6 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD
close(chanDone)
}
// In case of internal mediator apier needs to wait for it to initialize before offering it's methods
func registerApier(waitOnChans []chan struct{}) {
for _, chn := range waitOnChans {
select {
case <-time.After(5 * time.Minute):
engine.Logger.Crit(fmt.Sprintf("<Apier> Timeout waiting for dependecies to start."))
exitChan <- true
return
case <-chn:
}
}
}
func startCdrc(cdrsChan chan struct{}) {
if cfg.CdrcCdrs == utils.INTERNAL {
<-cdrsChan // Wait for CDRServer to come up before start processing

View File

@@ -1,8 +1,7 @@
# CGRateS Sample Configuration file
# CGRateS Configuration file
#
# This file contains the default configuration hardcoded into CGRateS.
# This is what you get when you load CGRateS with an empty configuration file.
# [global] must exist in all files, rest of the configuration is inter-changeable.
# Used in mediator_local_test
# Starts rater, cdrs and mediator connecting over internal channel
[global]
# ratingdb_type = redis # Rating subsystem database: <redis>.
@@ -38,16 +37,16 @@
# enabled = false # Start Balancer service: <true|false>.
[rater]
# enabled = false # Enable RaterCDRSExportPath service: <true|false>.
enabled = true # Enable RaterCDRSExportPath service: <true|false>.
# balancer = # Register to Balancer as worker: <""|internal|127.0.0.1:2013>.
[scheduler]
# enabled = false # Starts Scheduler service: <true|false>.
[cdrs]
# enabled = false # Start the CDR Server service: <true|false>.
enabled = true # Start the CDR Server service: <true|false>.
# extra_fields = # Extra fields to store in CDRs
# mediator = # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
mediator = internal # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
[cdre]
# cdr_format = csv # Exported CDRs format <csv>
@@ -76,8 +75,8 @@
# extra_fields = # Extra fields identifiers. For .csv, format: <label_extrafield_1>:<index_extrafield_1>[...,<label_extrafield_n>:<index_extrafield_n>]
[mediator]
# enabled = false # Starts Mediator service: <true|false>.
# rater = internal # Address where to reach the Rater: <internal|x.y.z.y:1234>
enabled = true # Starts Mediator service: <true|false>.
rater = internal # Address where to reach the Rater: <internal|x.y.z.y:1234>
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
# run_ids = # Identifiers of each extra mediation to run on CDRs
# reqtype_fields = # Name of request type fields to be used during extra mediation. Use index number in case of .csv cdrs.

View File

@@ -0,0 +1,185 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2013 ITsysCOM
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package mediator
import (
"flag"
"fmt"
"net/http"
"net/rpc"
"net/url"
"os/exec"
"path"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
/*
README:
Enable local tests by passing '-local' to the go test command
It is expected that the data folder of CGRateS exists at path /usr/share/cgrates/data or passed via command arguments.
Prior running the tests, create database and users by running:
mysql -pyourrootpwd < /usr/share/cgrates/data/storage/mysql/create_db_with_users.sql
What these tests do:
* Flush tables in storDb to start clean.
* Start engine with default configuration and give it some time to listen (here caching can slow down, hence the command argument parameter).
* Connect rpc client depending on encoding defined in configuration.
* Execute remote Apis and test their replies(follow prepaid1cent scenario so we can test load in dataDb also).
*/
var cfg *config.CGRConfig
var cgrRpc *rpc.Client
var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args
var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
var storDbType = flag.String("stordb_type", utils.MYSQL, "The type of the storDb database <mysql>")
var startDelay = flag.Int("delay_start", 300, "Number of miliseconds to it for rater to start and cache")
func init() {
cfgPath := path.Join(*dataDir, "conf", "samples", "mediator_test1.cfg")
cfg, _ = config.NewCGRConfig(&cfgPath)
}
func TestInitRatingDb(t *testing.T) {
if !*testLocal {
return
}
ratingDb, err := engine.ConfigureRatingStorage(cfg.RatingDBType, cfg.RatingDBHost, cfg.RatingDBPort, cfg.RatingDBName, cfg.RatingDBUser, cfg.RatingDBPass, cfg.DBDataEncoding)
if err != nil {
t.Fatal("Cannot connect to dataDb", err)
}
if err := ratingDb.Flush(); err != nil {
t.Fatal("Cannot reset dataDb", err)
}
}
// Empty tables before using them
func TestInitStorDb(t *testing.T) {
if !*testLocal {
return
}
if *storDbType != utils.MYSQL {
t.Fatal("Unsupported storDbType")
}
var mysql *engine.MySQLStorage
if d, err := engine.NewMySQLStorage(cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass); err != nil {
t.Fatal("Error on opening database connection: ", err)
} else {
mysql = d.(*engine.MySQLStorage)
}
for _, scriptName := range []string{engine.CREATE_CDRS_TABLES_SQL, engine.CREATE_COSTDETAILS_TABLES_SQL, engine.CREATE_MEDIATOR_TABLES_SQL} {
if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", *storDbType, scriptName)); err != nil {
t.Fatal("Error on mysql creation: ", err.Error())
return // No point in going further
}
}
for _, tbl := range []string{utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA} {
if _, err := mysql.Db.Query(fmt.Sprintf("SELECT 1 from %s", tbl)); err != nil {
t.Fatal(err.Error())
}
}
}
// Finds cgr-engine executable and starts it with default configuration
func TestStartEngine(t *testing.T) {
if !*testLocal {
return
}
enginePath, err := exec.LookPath("cgr-engine")
if err != nil {
t.Fatal("Cannot find cgr-engine executable")
}
exec.Command("pkill", "cgr-engine").Run() // Just to make sure another one is not running, bit brutal maybe we can fine tune it
engine := exec.Command(enginePath, "-rater", "-scheduler", "-cdrs", "-mediator", "-config", path.Join(*dataDir, "conf", "cgrates.cfg"))
if err := engine.Start(); err != nil {
t.Fatal("Cannot start cgr-engine: ", err.Error())
}
time.Sleep(time.Duration(*startDelay) * time.Millisecond) // Give time to rater to fire up
}
// Connect rpc client
func TestRpcConn(t *testing.T) {
if !*testLocal {
return
}
var err error
cgrRpc, err = rpc.Dial("tcp", cfg.RPCGOBListen) //ToDo: Fix with automatic config
if err != nil {
t.Fatal("Could not connect to CGR GOB-RPC Server: ", err.Error())
}
}
// Test here LoadTariffPlanFromFolder
func TestLoadTariffPlanFromFolder(t *testing.T) {
if !*testLocal {
return
}
reply := ""
// Simple test that command is executed without errors
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "prepaid1centpsec")}
if err := cgrRpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil {
t.Error("Got error on ApierV1.LoadTariffPlanFromFolder: ", err.Error())
} else if reply != utils.OK {
t.Error("Calling ApierV1.LoadTariffPlanFromFolder got reply: ", reply)
}
}
func TestPostCdrs(t *testing.T) {
if !*testLocal {
return
}
httpClient := new(http.Client)
cdrForm1 := url.Values{"accid": []string{"dsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"},
"tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"1002"},
"answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}}
cdrForm2 := url.Values{"accid": []string{"adsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"},
"tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"1002"},
"answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}}
for _, cdrForm := range []url.Values{cdrForm1, cdrForm2} {
cdrForm.Set(utils.CDRSOURCE, engine.TEST_SQL)
if _, err := httpClient.PostForm(fmt.Sprintf("http://%s/cgr", cfg.HTTPListen), cdrForm); err != nil {
t.Error(err.Error())
}
}
}
func TestRateCdrs(t *testing.T) {
if !*testLocal {
return
}
var reply string
if err := cgrRpc.Call("MediatorV1.RateCdrs", utils.AttrRateCdrs{}, &reply); err != nil {
t.Error(err.Error())
} else if reply != utils.OK {
t.Errorf("Unexpected reply: %s", reply)
}
}
// Simply kill the engine after we are done with tests within this file
func TestStopEngine(t *testing.T) {
if !*testLocal {
return
}
exec.Command("pkill", "cgr-engine").Run()
}

View File

@@ -48,5 +48,6 @@ func (self *MediatorV1) RateCdrs(attrs utils.AttrRateCdrs, reply *string) error
if err := self.Medi.RateCdrs(tStart, tEnd, attrs.RerateErrors, attrs.RerateRated); err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
}
*reply = utils.OK
return nil
}

View File

@@ -325,3 +325,9 @@ type AttrRateCdrs struct {
RerateErrors bool // Rerate previous CDRs with errors (makes sense for reqtype rated and pseudoprepaid
RerateRated bool // Rerate CDRs which were previously rated (makes sense for reqtype rated and pseudoprepaid)
}
type AttrLoadTpFromFolder struct {
FolderPath string // Take files from folder absolute path
DryRun bool // Do not write to database but parse only
FlushDb bool // Flush previous data before loading new one
}

View File

@@ -83,6 +83,7 @@ const (
CDRE_DRYRUN = "dry_run"
INTERNAL = "internal"
ZERO_RATING_SUBJECT_PREFIX = "*zero"
OK = "OK"
)
var (