From 944262ccff9907768b6767d69e46b984d88e8ea6 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 10 Feb 2014 18:31:27 +0100 Subject: [PATCH 1/3] Adding local_test for mediator rpc method --- apier/apier.go | 8 +- apier/apier_local_test.go | 2 +- cmd/cgr-engine/cgr-engine.go | 13 -- .../{cgrates.cfg => mediator_test1.cfg} | 17 +- mediator/mediator_local_test.go | 185 ++++++++++++++++++ mediator/mediator_rpc.go | 1 + utils/apitpdata.go | 6 + utils/consts.go | 1 + 8 files changed, 204 insertions(+), 29 deletions(-) rename data/conf/samples/{cgrates.cfg => mediator_test1.cfg} (91%) create mode 100644 mediator/mediator_local_test.go diff --git a/apier/apier.go b/apier/apier.go index 9ab9d216f..9d73c2c28 100644 --- a/apier/apier.go +++ b/apier/apier.go @@ -530,13 +530,9 @@ func (self *ApierV1) GetCachedItemAge(itemId string, reply *utils.CachedItemAge) return nil } -type AttrLoadTPFromFolder struct { - FolderPath string // Take files from folder absolute path - DryRun bool // Do not write to database but parse only - FlushDb bool // Flush previous data before loading new one -} -func (self *ApierV1) LoadTariffPlanFromFolder(attrs AttrLoadTPFromFolder, reply *string) error { + +func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, reply *string) error { loader := engine.NewFileCSVReader(self.RatingDb, self.AccountDb, utils.CSV_SEP, path.Join(attrs.FolderPath, utils.DESTINATIONS_CSV), path.Join(attrs.FolderPath, utils.TIMINGS_CSV), diff --git a/apier/apier_local_test.go b/apier/apier_local_test.go index a9786470c..d49eb9aa3 100644 --- a/apier/apier_local_test.go +++ b/apier/apier_local_test.go @@ -1181,7 +1181,7 @@ func TestApierLoadTariffPlanFromFolder(t *testing.T) { } reply := "" // Simple test that command is executed without errors - attrs := &AttrLoadTPFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "prepaid1centpsec")} + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "prepaid1centpsec")} if err := rater.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil { t.Error("Got error on ApierV1.LoadTariffPlanFromFolder: ", err.Error()) } else if reply != "OK" { diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index dc49a818c..743ee40de 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -123,19 +123,6 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD close(chanDone) } -// In case of internal mediator apier needs to wait for it to initialize before offering it's methods -func registerApier(waitOnChans []chan struct{}) { - for _, chn := range waitOnChans { - select { - case <-time.After(5 * time.Minute): - engine.Logger.Crit(fmt.Sprintf(" Timeout waiting for dependecies to start.")) - exitChan <- true - return - case <-chn: - } - } -} - func startCdrc(cdrsChan chan struct{}) { if cfg.CdrcCdrs == utils.INTERNAL { <-cdrsChan // Wait for CDRServer to come up before start processing diff --git a/data/conf/samples/cgrates.cfg b/data/conf/samples/mediator_test1.cfg similarity index 91% rename from data/conf/samples/cgrates.cfg rename to data/conf/samples/mediator_test1.cfg index f545ecbc4..5dfb040e2 100644 --- a/data/conf/samples/cgrates.cfg +++ b/data/conf/samples/mediator_test1.cfg @@ -1,8 +1,7 @@ -# CGRateS Sample Configuration file +# CGRateS Configuration file # -# This file contains the default configuration hardcoded into CGRateS. -# This is what you get when you load CGRateS with an empty configuration file. -# [global] must exist in all files, rest of the configuration is inter-changeable. +# Used in mediator_local_test +# Starts rater, cdrs and mediator connecting over internal channel [global] # ratingdb_type = redis # Rating subsystem database: . @@ -38,16 +37,16 @@ # enabled = false # Start Balancer service: . [rater] -# enabled = false # Enable RaterCDRSExportPath service: . +enabled = true # Enable RaterCDRSExportPath service: . # balancer = # Register to Balancer as worker: <""|internal|127.0.0.1:2013>. [scheduler] # enabled = false # Starts Scheduler service: . [cdrs] -# enabled = false # Start the CDR Server service: . +enabled = true # Start the CDR Server service: . # extra_fields = # Extra fields to store in CDRs -# mediator = # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> +mediator = internal # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> [cdre] # cdr_format = csv # Exported CDRs format @@ -76,8 +75,8 @@ # extra_fields = # Extra fields identifiers. For .csv, format: :[...,:] [mediator] -# enabled = false # Starts Mediator service: . -# rater = internal # Address where to reach the Rater: +enabled = true # Starts Mediator service: . +rater = internal # Address where to reach the Rater: # rater_reconnects = 3 # Number of reconnects to rater before giving up. # run_ids = # Identifiers of each extra mediation to run on CDRs # reqtype_fields = # Name of request type fields to be used during extra mediation. Use index number in case of .csv cdrs. diff --git a/mediator/mediator_local_test.go b/mediator/mediator_local_test.go new file mode 100644 index 000000000..7f86dad61 --- /dev/null +++ b/mediator/mediator_local_test.go @@ -0,0 +1,185 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2013 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package mediator + +import ( + "flag" + "fmt" + "net/http" + "net/rpc" + "net/url" + "os/exec" + "path" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +/* +README: + + Enable local tests by passing '-local' to the go test command + It is expected that the data folder of CGRateS exists at path /usr/share/cgrates/data or passed via command arguments. + Prior running the tests, create database and users by running: + mysql -pyourrootpwd < /usr/share/cgrates/data/storage/mysql/create_db_with_users.sql + What these tests do: + * Flush tables in storDb to start clean. + * Start engine with default configuration and give it some time to listen (here caching can slow down, hence the command argument parameter). + * Connect rpc client depending on encoding defined in configuration. + * Execute remote Apis and test their replies(follow prepaid1cent scenario so we can test load in dataDb also). +*/ + +var cfg *config.CGRConfig +var cgrRpc *rpc.Client + +var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args +var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") +var storDbType = flag.String("stordb_type", utils.MYSQL, "The type of the storDb database ") +var startDelay = flag.Int("delay_start", 300, "Number of miliseconds to it for rater to start and cache") + +func init() { + cfgPath := path.Join(*dataDir, "conf", "samples", "mediator_test1.cfg") + cfg, _ = config.NewCGRConfig(&cfgPath) +} + +func TestInitRatingDb(t *testing.T) { + if !*testLocal { + return + } + ratingDb, err := engine.ConfigureRatingStorage(cfg.RatingDBType, cfg.RatingDBHost, cfg.RatingDBPort, cfg.RatingDBName, cfg.RatingDBUser, cfg.RatingDBPass, cfg.DBDataEncoding) + if err != nil { + t.Fatal("Cannot connect to dataDb", err) + } + if err := ratingDb.Flush(); err != nil { + t.Fatal("Cannot reset dataDb", err) + } +} + +// Empty tables before using them +func TestInitStorDb(t *testing.T) { + if !*testLocal { + return + } + if *storDbType != utils.MYSQL { + t.Fatal("Unsupported storDbType") + } + var mysql *engine.MySQLStorage + if d, err := engine.NewMySQLStorage(cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass); err != nil { + t.Fatal("Error on opening database connection: ", err) + } else { + mysql = d.(*engine.MySQLStorage) + } + for _, scriptName := range []string{engine.CREATE_CDRS_TABLES_SQL, engine.CREATE_COSTDETAILS_TABLES_SQL, engine.CREATE_MEDIATOR_TABLES_SQL} { + if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", *storDbType, scriptName)); err != nil { + t.Fatal("Error on mysql creation: ", err.Error()) + return // No point in going further + } + } + for _, tbl := range []string{utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA} { + if _, err := mysql.Db.Query(fmt.Sprintf("SELECT 1 from %s", tbl)); err != nil { + t.Fatal(err.Error()) + } + } +} + +// Finds cgr-engine executable and starts it with default configuration +func TestStartEngine(t *testing.T) { + if !*testLocal { + return + } + enginePath, err := exec.LookPath("cgr-engine") + if err != nil { + t.Fatal("Cannot find cgr-engine executable") + } + exec.Command("pkill", "cgr-engine").Run() // Just to make sure another one is not running, bit brutal maybe we can fine tune it + engine := exec.Command(enginePath, "-rater", "-scheduler", "-cdrs", "-mediator", "-config", path.Join(*dataDir, "conf", "cgrates.cfg")) + if err := engine.Start(); err != nil { + t.Fatal("Cannot start cgr-engine: ", err.Error()) + } + time.Sleep(time.Duration(*startDelay) * time.Millisecond) // Give time to rater to fire up +} + +// Connect rpc client +func TestRpcConn(t *testing.T) { + if !*testLocal { + return + } + var err error + cgrRpc, err = rpc.Dial("tcp", cfg.RPCGOBListen) //ToDo: Fix with automatic config + if err != nil { + t.Fatal("Could not connect to CGR GOB-RPC Server: ", err.Error()) + } +} + +// Test here LoadTariffPlanFromFolder +func TestLoadTariffPlanFromFolder(t *testing.T) { + if !*testLocal { + return + } + reply := "" + // Simple test that command is executed without errors + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "prepaid1centpsec")} + if err := cgrRpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil { + t.Error("Got error on ApierV1.LoadTariffPlanFromFolder: ", err.Error()) + } else if reply != utils.OK { + t.Error("Calling ApierV1.LoadTariffPlanFromFolder got reply: ", reply) + } +} + +func TestPostCdrs(t *testing.T) { + if !*testLocal { + return + } + httpClient := new(http.Client) + cdrForm1 := url.Values{"accid": []string{"dsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"}, + "tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"1002"}, + "answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}} + cdrForm2 := url.Values{"accid": []string{"adsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"}, + "tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"1002"}, + "answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}} + for _, cdrForm := range []url.Values{cdrForm1, cdrForm2} { + cdrForm.Set(utils.CDRSOURCE, engine.TEST_SQL) + if _, err := httpClient.PostForm(fmt.Sprintf("http://%s/cgr", cfg.HTTPListen), cdrForm); err != nil { + t.Error(err.Error()) + } + } +} + +func TestRateCdrs(t *testing.T) { + if !*testLocal { + return + } + var reply string + if err := cgrRpc.Call("MediatorV1.RateCdrs", utils.AttrRateCdrs{}, &reply); err != nil { + t.Error(err.Error()) + } else if reply != utils.OK { + t.Errorf("Unexpected reply: %s", reply) + } +} + +// Simply kill the engine after we are done with tests within this file +func TestStopEngine(t *testing.T) { + if !*testLocal { + return + } + exec.Command("pkill", "cgr-engine").Run() +} diff --git a/mediator/mediator_rpc.go b/mediator/mediator_rpc.go index b5a52ce66..0fcc7fd62 100644 --- a/mediator/mediator_rpc.go +++ b/mediator/mediator_rpc.go @@ -48,5 +48,6 @@ func (self *MediatorV1) RateCdrs(attrs utils.AttrRateCdrs, reply *string) error if err := self.Medi.RateCdrs(tStart, tEnd, attrs.RerateErrors, attrs.RerateRated); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } + *reply = utils.OK return nil } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index c12619561..401141380 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -325,3 +325,9 @@ type AttrRateCdrs struct { RerateErrors bool // Rerate previous CDRs with errors (makes sense for reqtype rated and pseudoprepaid RerateRated bool // Rerate CDRs which were previously rated (makes sense for reqtype rated and pseudoprepaid) } + +type AttrLoadTpFromFolder struct { + FolderPath string // Take files from folder absolute path + DryRun bool // Do not write to database but parse only + FlushDb bool // Flush previous data before loading new one +} diff --git a/utils/consts.go b/utils/consts.go index 33982b065..fc3c4c99e 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -83,6 +83,7 @@ const ( CDRE_DRYRUN = "dry_run" INTERNAL = "internal" ZERO_RATING_SUBJECT_PREFIX = "*zero" + OK = "OK" ) var ( From bd71dc9a4c3a8bcd8e5f2b298f1bc58771d55f6c Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 10 Feb 2014 20:00:30 +0100 Subject: [PATCH 2/3] Test fixups --- apier/tutfscsv_local_test.go | 2 +- data/conf/cgrates.cfg | 2 +- data/conf/samples/mediator_test1.cfg | 2 +- local_test.sh | 4 +- mediator/mediator_local_test.go | 71 +++++++++++++++++++--------- 5 files changed, 54 insertions(+), 27 deletions(-) diff --git a/apier/tutfscsv_local_test.go b/apier/tutfscsv_local_test.go index 70732219a..c4d146069 100644 --- a/apier/tutfscsv_local_test.go +++ b/apier/tutfscsv_local_test.go @@ -135,7 +135,7 @@ func TestFsCsvLoadTariffPlans(t *testing.T) { } reply := "" // Simple test that command is executed without errors - attrs := &AttrLoadTPFromFolder{FolderPath: path.Join(*dataDir, "tutorials", "fs_csv", "cgrates", "tariffplans")} + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tutorials", "fs_csv", "cgrates", "tariffplans")} if err := rater.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil { t.Error("Got error on ApierV1.LoadTariffPlanFromFolder: ", err.Error()) } else if reply != "OK" { diff --git a/data/conf/cgrates.cfg b/data/conf/cgrates.cfg index 9b14678c7..fa88dd4d5 100644 --- a/data/conf/cgrates.cfg +++ b/data/conf/cgrates.cfg @@ -46,7 +46,7 @@ [cdrs] # enabled = false # Start the CDR Server service: . -# extra_fields = # Extra fields to store in CDRs +# extra_fields = # Extra fields to store in CDRs for non-generic CDRs # mediator = # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> [cdre] diff --git a/data/conf/samples/mediator_test1.cfg b/data/conf/samples/mediator_test1.cfg index 5dfb040e2..fc9765653 100644 --- a/data/conf/samples/mediator_test1.cfg +++ b/data/conf/samples/mediator_test1.cfg @@ -45,7 +45,7 @@ enabled = true # Enable RaterCDRSExportPath service: . [cdrs] enabled = true # Start the CDR Server service: . -# extra_fields = # Extra fields to store in CDRs +# extra_fields = # Extra fields to store in CDRs in case of non generic CDRs mediator = internal # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> [cdre] diff --git a/local_test.sh b/local_test.sh index 183830841..2de420cd0 100755 --- a/local_test.sh +++ b/local_test.sh @@ -8,9 +8,11 @@ go test github.com/cgrates/cgrates/engine -local en=$? go test github.com/cgrates/cgrates/cdrc -local cdrc=$? +go test github.com/cgrates/cgrates/mediator -local +med=$? -exit $gen && $ap && $en && cdrc +exit $gen && $ap && $en && $cdrc && $med diff --git a/mediator/mediator_local_test.go b/mediator/mediator_local_test.go index 7f86dad61..eca4ddb90 100644 --- a/mediator/mediator_local_test.go +++ b/mediator/mediator_local_test.go @@ -50,14 +50,15 @@ README: var cfg *config.CGRConfig var cgrRpc *rpc.Client +var cdrStor engine.CdrStorage var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") var storDbType = flag.String("stordb_type", utils.MYSQL, "The type of the storDb database ") var startDelay = flag.Int("delay_start", 300, "Number of miliseconds to it for rater to start and cache") +var cfgPath = path.Join(*dataDir, "conf", "samples", "mediator_test1.cfg") func init() { - cfgPath := path.Join(*dataDir, "conf", "samples", "mediator_test1.cfg") cfg, _ = config.NewCGRConfig(&cfgPath) } @@ -83,10 +84,11 @@ func TestInitStorDb(t *testing.T) { t.Fatal("Unsupported storDbType") } var mysql *engine.MySQLStorage - if d, err := engine.NewMySQLStorage(cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass); err != nil { + var err error + if cdrStor, err = engine.ConfigureCdrStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass); err != nil { t.Fatal("Error on opening database connection: ", err) } else { - mysql = d.(*engine.MySQLStorage) + mysql = cdrStor.(*engine.MySQLStorage) } for _, scriptName := range []string{engine.CREATE_CDRS_TABLES_SQL, engine.CREATE_COSTDETAILS_TABLES_SQL, engine.CREATE_MEDIATOR_TABLES_SQL} { if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", *storDbType, scriptName)); err != nil { @@ -111,7 +113,7 @@ func TestStartEngine(t *testing.T) { t.Fatal("Cannot find cgr-engine executable") } exec.Command("pkill", "cgr-engine").Run() // Just to make sure another one is not running, bit brutal maybe we can fine tune it - engine := exec.Command(enginePath, "-rater", "-scheduler", "-cdrs", "-mediator", "-config", path.Join(*dataDir, "conf", "cgrates.cfg")) + engine := exec.Command(enginePath, "-config", cfgPath) if err := engine.Start(); err != nil { t.Fatal("Cannot start cgr-engine: ", err.Error()) } @@ -130,6 +132,48 @@ func TestRpcConn(t *testing.T) { } } +func TestPostCdrs(t *testing.T) { + if !*testLocal { + return + } + httpClient := new(http.Client) + cdrForm1 := url.Values{"accid": []string{"dsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"}, + "tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"1002"}, + "answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}} + cdrForm2 := url.Values{"accid": []string{"adsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"}, + "tenant": []string{"itsyscom.com"}, "tor": []string{"call"}, "account": []string{"dan"}, "subject": []string{"dan"}, "destination": []string{"1002"}, + "answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}} + for _, cdrForm := range []url.Values{cdrForm1, cdrForm2} { + cdrForm.Set(utils.CDRSOURCE, engine.TEST_SQL) + if _, err := httpClient.PostForm(fmt.Sprintf("http://%s/cgr", cfg.HTTPListen), cdrForm); err != nil { + t.Error(err.Error()) + } + } + if storedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, false, false); err != nil { + t.Error(err) + } else if len(storedCdrs) != 2 { + t.Error(fmt.Sprintf("Unexpected number of CDRs stored: %d", len(storedCdrs))) + } +} + +// Directly inject CDRs into storDb +func TestInjectCdrs(t *testing.T) { + if !*testLocal { + return + } + cgrCdr1 := utils.CgrCdr{"accid": "aaaaadsafdsaf", "cdr_source": engine.TEST_SQL, "cdrhost": "192.168.1.1", "reqtype": "rated", "direction": "*out", + "tenant": "cgrates.org", "tor": "call", "account": "1001", "subject": "1001", "destination": "1002", + "answer_time": "2013-11-07T08:42:26Z", "duration": "10"} + if err := cdrStor.SetCdr(cgrCdr1); err != nil { + t.Error(err) + } + if storedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, false, false); err != nil { + t.Error(err) + } else if len(storedCdrs) != 3 { + t.Error(fmt.Sprintf("Unexpected number of CDRs stored: %d", len(storedCdrs))) + } +} + // Test here LoadTariffPlanFromFolder func TestLoadTariffPlanFromFolder(t *testing.T) { if !*testLocal { @@ -145,25 +189,6 @@ func TestLoadTariffPlanFromFolder(t *testing.T) { } } -func TestPostCdrs(t *testing.T) { - if !*testLocal { - return - } - httpClient := new(http.Client) - cdrForm1 := url.Values{"accid": []string{"dsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"}, - "tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"1002"}, - "answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}} - cdrForm2 := url.Values{"accid": []string{"adsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"}, - "tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"1002"}, - "answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}} - for _, cdrForm := range []url.Values{cdrForm1, cdrForm2} { - cdrForm.Set(utils.CDRSOURCE, engine.TEST_SQL) - if _, err := httpClient.PostForm(fmt.Sprintf("http://%s/cgr", cfg.HTTPListen), cdrForm); err != nil { - t.Error(err.Error()) - } - } -} - func TestRateCdrs(t *testing.T) { if !*testLocal { return From 9042c984928f5a4ee2b45f1b8a6039ce75b01f0f Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 11 Feb 2014 12:57:00 +0100 Subject: [PATCH 3/3] Fixup mediation with rerating --- engine/storage_sql.go | 25 +++++++++------- mediator/mediator.go | 2 -- mediator/mediator_local_test.go | 51 +++++++++++++++++++++++++++------ 3 files changed, 58 insertions(+), 20 deletions(-) diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 52a8e9a29..b837c2ecf 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -646,7 +646,7 @@ func (self *SQLStorage) LogCallCost(uuid, source, runid string, cc *CallCost) (e if err != nil { Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err)) } - _, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, accid, direction, tenant, tor, account, subject, destination, connect_fee, cost, timespans, source, runid, cost_time)VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %f, %f, '%s','%s','%s',now()) ON DUPLICATE KEY UPDATE direction=values(direction), tenant=values(tenant), tor=values(tor), account=values(account), subject=values(subject), destination=values(destination), connect_fee=values(connect_fee), cost=values(cost), timespans=values(timespans), source=values(source), cost_time=now()", + _, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, accid, direction, tenant, tor, account, subject, destination, cost, timespans, source, runid, cost_time)VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %f, '%s','%s','%s',now()) ON DUPLICATE KEY UPDATE direction=values(direction), tenant=values(tenant), tor=values(tor), account=values(account), subject=values(subject), destination=values(destination), cost=values(cost), timespans=values(timespans), source=values(source), cost_time=now()", utils.TBL_COST_DETAILS, utils.FSCgrId(uuid), uuid, @@ -667,7 +667,7 @@ func (self *SQLStorage) LogCallCost(uuid, source, runid string, cc *CallCost) (e } func (self *SQLStorage) GetCallCostLog(cgrid, source, runid string) (cc *CallCost, err error) { - row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, connect_fee, cost, timespans, source FROM %s WHERE cgrid='%s' AND source='%s' AND runid='%s'", utils.TBL_COST_DETAILS, cgrid, source, runid)) + row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, cost, timespans, source FROM %s WHERE cgrid='%s' AND source='%s' AND runid='%s'", utils.TBL_COST_DETAILS, cgrid, source, runid)) var accid, src string var timespansJson string cc = &CallCost{Cost: -1} @@ -739,7 +739,9 @@ func (self *SQLStorage) SetRatedCdr(storedCdr *utils.StoredCdr, extraInfo string return } -// Return a slice of CDRs from storDb using optional filters. +// Return a slice of CDRs from storDb using optional filters.a +// ignoreErr - do not consider cdrs with rating errors +// ignoreRated - do not consider cdrs which were already rated, including here the ones with errors func (self *SQLStorage) GetStoredCdrs(timeStart, timeEnd time.Time, ignoreErr, ignoreRated bool) ([]*utils.StoredCdr, error) { var cdrs []*utils.StoredCdr q := fmt.Sprintf("SELECT %s.cgrid,accid,cdrhost,cdrsource,reqtype,direction,tenant,tor,account,%s.subject,destination,answer_time,duration,extra_fields,runid,cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid", utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_RATED_CDRS, utils.TBL_CDRS_PRIMARY, utils.TBL_RATED_CDRS) @@ -756,17 +758,20 @@ func (self *SQLStorage) GetStoredCdrs(timeStart, timeEnd time.Time, ignoreErr, i } fltr += fmt.Sprintf(" answer_time<'%d'", timeEnd) } - if ignoreErr { - if len(fltr) != 0 { - fltr += " AND " - } - fltr += "cost>-1" - } if ignoreRated { if len(fltr) != 0 { fltr += " AND " } - fltr += "cost<=0" + if ignoreErr { + fltr += "cost IS NULL" + } else { + fltr += "(cost=-1 OR cost IS NULL)" + } + } else if ignoreErr { + if len(fltr) != 0 { + fltr += " AND " + } + fltr += "(cost!=-1 OR cost IS NULL)" } if len(fltr) != 0 { q += fmt.Sprintf(" WHERE %s", fltr) diff --git a/mediator/mediator.go b/mediator/mediator.go index 12ec519ed..cc34b6127 100644 --- a/mediator/mediator.go +++ b/mediator/mediator.go @@ -130,12 +130,10 @@ func (self *Mediator) rateCDR(cdr *utils.StoredCdr) error { // Forks original CDR based on original request plus runIds for extra mediation func (self *Mediator) RateCdr(dbcdr utils.RawCDR) error { - //engine.Logger.Debug(fmt.Sprintf("Mediating rawCdr: %v, duration: %d",dbcdr, dbcdr.GetDuration())) rtCdr, err := utils.NewStoredCdrFromRawCDR(dbcdr) if err != nil { return err } - //engine.Logger.Debug(fmt.Sprintf("Have converted raw into rated: %v", rtCdr)) cdrs := []*utils.StoredCdr{rtCdr} // Start with initial dbcdr, will add here all to be mediated for runIdx, runId := range self.cgrCfg.MediatorRunIds { forkedCdr, err := dbcdr.AsStoredCdr(self.cgrCfg.MediatorRunIds[runIdx], self.cgrCfg.MediatorReqTypeFields[runIdx], self.cgrCfg.MediatorDirectionFields[runIdx], diff --git a/mediator/mediator_local_test.go b/mediator/mediator_local_test.go index eca4ddb90..f0d76aeca 100644 --- a/mediator/mediator_local_test.go +++ b/mediator/mediator_local_test.go @@ -138,10 +138,10 @@ func TestPostCdrs(t *testing.T) { } httpClient := new(http.Client) cdrForm1 := url.Values{"accid": []string{"dsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"}, - "tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"1002"}, + "tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"+4986517174963"}, "answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}} cdrForm2 := url.Values{"accid": []string{"adsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"}, - "tenant": []string{"itsyscom.com"}, "tor": []string{"call"}, "account": []string{"dan"}, "subject": []string{"dan"}, "destination": []string{"1002"}, + "tenant": []string{"itsyscom.com"}, "tor": []string{"call"}, "account": []string{"1003"}, "subject": []string{"1003"}, "destination": []string{"+4986517174964"}, "answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}} for _, cdrForm := range []url.Values{cdrForm1, cdrForm2} { cdrForm.Set(utils.CDRSOURCE, engine.TEST_SQL) @@ -151,9 +151,14 @@ func TestPostCdrs(t *testing.T) { } if storedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, false, false); err != nil { t.Error(err) - } else if len(storedCdrs) != 2 { + } else if len(storedCdrs) != 2 { // Make sure CDRs made it into StorDb t.Error(fmt.Sprintf("Unexpected number of CDRs stored: %d", len(storedCdrs))) } + if nonErrorCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, true, false); err != nil { + t.Error(err) + } else if len(nonErrorCdrs) != 0 { // Just two of them should be without errors + t.Error(fmt.Sprintf("Unexpected number of CDRs stored: %d", len(nonErrorCdrs))) + } } // Directly inject CDRs into storDb @@ -161,17 +166,27 @@ func TestInjectCdrs(t *testing.T) { if !*testLocal { return } - cgrCdr1 := utils.CgrCdr{"accid": "aaaaadsafdsaf", "cdr_source": engine.TEST_SQL, "cdrhost": "192.168.1.1", "reqtype": "rated", "direction": "*out", - "tenant": "cgrates.org", "tor": "call", "account": "1001", "subject": "1001", "destination": "1002", + cgrCdr1 := utils.CgrCdr{"accid": "aaaaadsafdsaf", "cdrsource": engine.TEST_SQL, "cdrhost": "192.168.1.1", "reqtype": "rated", "direction": "*out", + "tenant": "cgrates.org", "tor": "call", "account": "dan", "subject": "dan", "destination": "+4986517174963", "answer_time": "2013-11-07T08:42:26Z", "duration": "10"} - if err := cdrStor.SetCdr(cgrCdr1); err != nil { - t.Error(err) + cgrCdr2 := utils.CgrCdr{"accid": "baaaadsafdsaf", "cdrsource": engine.TEST_SQL, "cdrhost": "192.168.1.1", "reqtype": "rated", "direction": "*out", + "tenant": "cgrates.org", "tor": "call", "account": "dan", "subject": "dan", "destination": "+4986517173964", + "answer_time": "2013-11-07T09:42:26Z", "duration": "20"} + for _, cdr := range []utils.CgrCdr{ cgrCdr1, cgrCdr2} { + if err := cdrStor.SetCdr(cdr); err != nil { + t.Error(err) + } } if storedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, false, false); err != nil { t.Error(err) - } else if len(storedCdrs) != 3 { + } else if len(storedCdrs) != 4 { // Make sure CDRs made it into StorDb t.Error(fmt.Sprintf("Unexpected number of CDRs stored: %d", len(storedCdrs))) } + if nonRatedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, true, true); err != nil { + t.Error(err) + } else if len(nonRatedCdrs) != 2 { // Just two of them should be non-rated + t.Error(fmt.Sprintf("Unexpected number of CDRs non-rated: %d", len(nonRatedCdrs))) + } } // Test here LoadTariffPlanFromFolder @@ -199,6 +214,26 @@ func TestRateCdrs(t *testing.T) { } else if reply != utils.OK { t.Errorf("Unexpected reply: %s", reply) } + if nonRatedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, true, true); err != nil { + t.Error(err) + } else if len(nonRatedCdrs) != 0 { // Just two of them should be non-rated + t.Error(fmt.Sprintf("Unexpected number of CDRs non-rated: %d", len(nonRatedCdrs))) + } + if errRatedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, false, true); err != nil { + t.Error(err) + } else if len(errRatedCdrs) != 2 { // The first 2 with errors should be still there before rerating + t.Error(fmt.Sprintf("Unexpected number of CDRs with errors: %d", len(errRatedCdrs))) + } + if err := cgrRpc.Call("MediatorV1.RateCdrs", utils.AttrRateCdrs{RerateErrors: true}, &reply); err != nil { + t.Error(err.Error()) + } else if reply != utils.OK { + t.Errorf("Unexpected reply: %s", reply) + } + if errRatedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, false, true); err != nil { + t.Error(err) + } else if len(errRatedCdrs) != 1 { // One CDR with errors should be fixed now by rerating + t.Error(fmt.Sprintf("Unexpected number of CDRs with errors: %d", len(errRatedCdrs))) + } } // Simply kill the engine after we are done with tests within this file