diff --git a/build.sh b/build.sh index 006496d11..82a75a6ff 100755 --- a/build.sh +++ b/build.sh @@ -6,7 +6,9 @@ go install github.com/cgrates/cgrates/cmd/cgr-loader cl=$? go install github.com/cgrates/cgrates/cmd/cgr-console cc=$? +go install github.com/cgrates/cgrates/cmd/cgr-tester +ct=$? -exit $cr || $cl || $cc +exit $cr || $cl || $cc || $ct diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 02ba1bb0f..b51afdb40 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -44,11 +44,11 @@ var ( ratingdb_pass = flag.String("ratingdb_passwd", cgrConfig.RatingDBPass, "The RatingDb user's password.") accountdb_type = flag.String("accountdb_type", cgrConfig.AccountDBType, "The type of the AccountingDb database ") - accountdb_typehost = flag.String("accountdb_host", cgrConfig.AccountDBHost, "The AccountingDb host to connect to.") - accountdb_typeport = flag.String("accountdb_port", cgrConfig.AccountDBPort, "The AccountingDb port to bind to.") - accountdb_typename = flag.String("accountdb_name", cgrConfig.AccountDBName, "The name/number of the AccountingDb to connect to.") - accountdb_typeuser = flag.String("accountdb_user", cgrConfig.AccountDBUser, "The AccountingDb user to sign in as.") - accountdb_typepass = flag.String("accountdb_passwd", cgrConfig.AccountDBPass, "The AccountingDb user's password.") + accountdb_host = flag.String("accountdb_host", cgrConfig.AccountDBHost, "The AccountingDb host to connect to.") + accountdb_port = flag.String("accountdb_port", cgrConfig.AccountDBPort, "The AccountingDb port to bind to.") + accountdb_name = flag.String("accountdb_name", cgrConfig.AccountDBName, "The name/number of the AccountingDb to connect to.") + accountdb_user = flag.String("accountdb_user", cgrConfig.AccountDBUser, "The AccountingDb user to sign in as.") + accountdb_pass = flag.String("accountdb_passwd", cgrConfig.AccountDBPass, "The AccountingDb user's password.") stor_db_type = flag.String("stordb_type", cgrConfig.StorDBType, "The type of the storDb database ") stor_db_host = flag.String("stordb_host", cgrConfig.StorDBHost, "The storDb host to connect to.") @@ -70,7 +70,7 @@ var ( toStorDb = flag.Bool("to_stordb", false, "Import the tariff plan from files to storDb") historyServer = flag.String("history_server", cgrConfig.HistoryServer, "The history server address:port, empty to disable automaticautomatic history archiving") raterAddress = flag.String("rater_address", cgrConfig.MediatorRater, "Rater service to contact for cache reloads, empty to disable automatic cache reloads") - rpcEncoding = flag.String("rpc_encoding", "json", "The history server rpc encoding json|gob") + rpcEncoding = flag.String("rpc_encoding", cgrConfig.RPCEncoding, "The history server rpc encoding json|gob") runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields") ) @@ -91,14 +91,14 @@ func main() { if *fromStorDb { ratingDb, errRatingDb = engine.ConfigureRatingStorage(*ratingdb_type, *ratingdb_host, *ratingdb_port, *ratingdb_name, *ratingdb_user, *ratingdb_pass, *dbdata_encoding) - accountDb, errAccDb = engine.ConfigureAccountingStorage(*accountdb_type, *accountdb_typehost, *accountdb_typeport, *accountdb_typename, *accountdb_typeuser, *accountdb_typepass, *dbdata_encoding) + accountDb, errAccDb = engine.ConfigureAccountingStorage(*accountdb_type, *accountdb_host, *accountdb_port, *accountdb_name, *accountdb_user, *accountdb_pass, *dbdata_encoding) storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding) } else if *toStorDb { // Import from csv files to storDb storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding) } else { // Default load from csv files to dataDb ratingDb, errRatingDb = engine.ConfigureRatingStorage(*ratingdb_type, *ratingdb_host, *ratingdb_port, *ratingdb_name, *ratingdb_user, *ratingdb_pass, *dbdata_encoding) - accountDb, errAccDb = engine.ConfigureAccountingStorage(*accountdb_type, *accountdb_typehost, *accountdb_typeport, *accountdb_typename, *accountdb_typeuser, *accountdb_typepass, *dbdata_encoding) + accountDb, errAccDb = engine.ConfigureAccountingStorage(*accountdb_type, *accountdb_host, *accountdb_port, *accountdb_name, *accountdb_user, *accountdb_pass, *dbdata_encoding) } // Defer databases opened to be closed when we are done for _, db := range []engine.Storage{ratingDb, accountDb, storDb} { @@ -157,7 +157,7 @@ func main() { log.Print("WARNING: Rates history archiving is disabled!") } if *raterAddress != "" { // Init connection to rater so we can reload it's data - if *rpcEncoding == "json" { + if *rpcEncoding == config.JSON { rater, err = jsonrpc.Dial("tcp", *raterAddress) } else { rater, err = rpc.Dial("tcp", *raterAddress) diff --git a/cmd/stress/README.md b/cmd/cgr-tester/README.md similarity index 100% rename from cmd/stress/README.md rename to cmd/cgr-tester/README.md diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go new file mode 100644 index 000000000..575163121 --- /dev/null +++ b/cmd/cgr-tester/cgr-tester.go @@ -0,0 +1,185 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2013 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package main + +import ( + "flag" + "log" + "os" + "runtime" + "runtime/pprof" + "time" + "fmt" + "net/rpc" + "net/rpc/jsonrpc" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/config" +) + +var ( + cgrConfig, _ = config.NewDefaultCGRConfig() + cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") + memprofile = flag.String("memprofile", "", "write memory profile to this file") + runs = flag.Int("runs", 10000, "stress cycle number") + parallel = flag.Int("parallel", 0, "run n requests in parallel") + ratingdb_type = flag.String("ratingdb_type", cgrConfig.RatingDBType, "The type of the RatingDb database ") + ratingdb_host = flag.String("ratingdb_host", cgrConfig.RatingDBHost, "The RatingDb host to connect to.") + ratingdb_port = flag.String("ratingdb_port", cgrConfig.RatingDBPort, "The RatingDb port to bind to.") + ratingdb_name = flag.String("ratingdb_name", cgrConfig.RatingDBName, "The name/number of the RatingDb to connect to.") + ratingdb_user = flag.String("ratingdb_user", cgrConfig.RatingDBUser, "The RatingDb user to sign in as.") + ratingdb_pass = flag.String("ratingdb_passwd", cgrConfig.RatingDBPass, "The RatingDb user's password.") + accountdb_type = flag.String("accountdb_type", cgrConfig.AccountDBType, "The type of the AccountingDb database ") + accountdb_host = flag.String("accountdb_host", cgrConfig.AccountDBHost, "The AccountingDb host to connect to.") + accountdb_port = flag.String("accountdb_port", cgrConfig.AccountDBPort, "The AccountingDb port to bind to.") + accountdb_name = flag.String("accountdb_name", cgrConfig.AccountDBName, "The name/number of the AccountingDb to connect to.") + accountdb_user = flag.String("accountdb_user", cgrConfig.AccountDBUser, "The AccountingDb user to sign in as.") + accountdb_pass = flag.String("accountdb_passwd", cgrConfig.AccountDBPass, "The AccountingDb user's password.") + dbdata_encoding = flag.String("dbdata_encoding", cgrConfig.DBDataEncoding, "The encoding used to store object data in strings.") + raterAddress = flag.String("rater_address", "", "Rater address for remote tests. Empty for internal rater.") + rpcEncoding = flag.String("rpc_encoding", cgrConfig.RPCEncoding, "Rpc encoding to use when talking to remote rater ") + tor = flag.String("tor", "call", "The type of record to use in queries.") + tenant = flag.String("tenant", "call", "The type of record to use in queries.") + subject = flag.String("subject", "1001", "The rating subject to use in queries.") + destination = flag.String("destination", "+4986517174963", "The destination to use in queries.") + + nilDuration = time.Duration(0) +) + +func durInternalRater( cd *engine.CallDescriptor) (time.Duration, error) { + ratingDb, err := engine.ConfigureRatingStorage(*ratingdb_type, *ratingdb_host, *ratingdb_port, *ratingdb_name, *ratingdb_user, *ratingdb_pass, *dbdata_encoding) + if err != nil { + return nilDuration, fmt.Errorf("Could not connect to rating database: %s", err.Error()) + } + defer ratingDb.Close() + engine.SetRatingStorage(ratingDb) + accountDb, err := engine.ConfigureAccountingStorage(*accountdb_type, *accountdb_host, *accountdb_port, *accountdb_name, *accountdb_user, *accountdb_pass, *dbdata_encoding) + if err != nil { + return nilDuration, fmt.Errorf("Could not connect to accounting database: %s", err.Error()) + } + defer accountDb.Close() + engine.SetAccountingStorage(accountDb) + if err := ratingDb.CacheRating(nil, nil, nil); err != nil { + return nilDuration, fmt.Errorf("Cache rating error: %s", err.Error()) + } + log.Printf("Runnning %d cycles...", *runs) + var result *engine.CallCost + j := 0 + start := time.Now() + for i := 0; i < *runs; i++ { + result, err = cd.GetCost() + if *memprofile != "" { + runtime.MemProfileRate = 1 + runtime.GC() + f, err := os.Create(*memprofile) + if err != nil { + log.Fatal(err) + } + pprof.WriteHeapProfile(f) + f.Close() + break + } + j = i + } + log.Print(result, j, err) + memstats := new(runtime.MemStats) + runtime.ReadMemStats(memstats) + log.Printf("memstats before GC: Kbytes = %d footprint = %d", + memstats.HeapAlloc/1024, memstats.Sys/1024) + return time.Since(start), nil +} + + +func durRemoteRater( cd *engine.CallDescriptor) (time.Duration, error) { + result := engine.CallCost{} + var client *rpc.Client + var err error + if *rpcEncoding=="json" { + client, err = jsonrpc.Dial("tcp", *raterAddress) + } else { + client, err = rpc.Dial("tcp", *raterAddress) + } + if err != nil { + return nilDuration, fmt.Errorf("Could not connect to engine: ", err.Error()) + } + defer client.Close() + start := time.Now() + if *parallel > 0 { + // var divCall *rpc.Call + var sem = make(chan int, *parallel) + var finish = make(chan int) + for i := 0; i < *runs; i++ { + go func() { + sem <- 1 + client.Call("Responder.GetCost", cd, &result) + <-sem + finish <- 1 + // divCall = client.Go("Responder.GetCost", cd, &result, nil) + }() + } + for i := 0; i < *runs; i++ { + <-finish + } + // <-divCall.Done + } else { + for j := 0; j < *runs; j++ { + client.Call("Responder.GetCost", cd, &result) + } + } + log.Println(result) + return time.Since(start), nil +} + + + + +func main() { + flag.Parse() + runtime.GOMAXPROCS(runtime.NumCPU() - 1) + + if *cpuprofile != "" { + f, err := os.Create(*cpuprofile) + if err != nil { + log.Fatal(err) + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } + cd := &engine.CallDescriptor{ + TimeStart: time.Date(2013, time.December, 13, 22, 30, 0, 0, time.UTC), + TimeEnd: time.Date(2013, time.December, 13, 22, 31, 0, 0, time.UTC), + CallDuration: 60 * time.Second, + Direction: "*out", + TOR: *tor, + Tenant: *tenant, + Subject: *subject, + Destination: *destination, + } + var duration time.Duration + var err error + if len(*raterAddress) == 0 { + duration, err = durInternalRater(cd) + } else { + duration, err = durRemoteRater(cd) + } + if err != nil { + log.Fatal(err.Error()) + } else { + log.Printf("Elapsed: %d resulted: %f req/s.", duration, float64(*runs)/duration.Seconds()) + } +} diff --git a/cmd/stress/cgr-raterstress/cgr-raterstress.go b/cmd/stress/cgr-raterstress/cgr-raterstress.go deleted file mode 100644 index 6945c96cd..000000000 --- a/cmd/stress/cgr-raterstress/cgr-raterstress.go +++ /dev/null @@ -1,102 +0,0 @@ -/* -Rating system designed to be used in VoIP Carriers World -Copyright (C) 2013 ITsysCOM - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package main - -import ( - "flag" - "log" - "net/rpc" - "os" - "github.com/cgrates/cgrates/engine" - //"net/rpc/jsonrpc" - "net/rpc/jsonrpc" - "runtime" - "runtime/pprof" - "time" -) - -var ( - balancer = flag.String("balancer", "localhost:2012", "balancer server address") - runs = flag.Int("runs", 10000, "stress cycle number") - parallel = flag.Int("parallel", 0, "run n requests in parallel") - json = flag.Bool("json", false, "use JSON for RPC encoding") - cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") -) - -func main() { - flag.Parse() - if *cpuprofile != "" { - f, err := os.Create(*cpuprofile) - if err != nil { - log.Fatal(err) - } - pprof.StartCPUProfile(f) - defer pprof.StopCPUProfile() - } - runtime.GOMAXPROCS(runtime.NumCPU()) - - cd := engine.CallDescriptor{ - TimeStart: time.Date(2013, time.December, 13, 22, 30, 0, 0, time.UTC), - TimeEnd: time.Date(2013, time.December, 13, 22, 31, 0, 0, time.UTC), - CallDuration: 60 * time.Second, - Direction: "*out", - TOR: "call", - Tenant: "cgrates.org", - Subject: "1001", - Destination: "+49", - } - result := engine.CallCost{} - var client *rpc.Client - var err error - if *json { - client, err = jsonrpc.Dial("tcp", *balancer) - } else { - client, err = rpc.Dial("tcp", *balancer) - } - if err != nil { - log.Fatal("Could not connect to engine: ", err) - } - start := time.Now() - if *parallel > 0 { - // var divCall *rpc.Call - var sem = make(chan int, *parallel) - var finish = make(chan int) - for i := 0; i < *runs; i++ { - go func() { - sem <- 1 - client.Call("Responder.GetCost", cd, &result) - <-sem - finish <- 1 - // divCall = client.Go("Responder.GetCost", cd, &result, nil) - }() - } - for i := 0; i < *runs; i++ { - <-finish - } - // <-divCall.Done - } else { - for j := 0; j < *runs; j++ { - client.Call("Responder.GetCost", cd, &result) - } - } - duration := time.Since(start) - log.Println(result) - client.Close() - log.Printf("Elapsed: %v resulted: %v req/s.", duration, float64(*runs)/duration.Seconds()) -} diff --git a/cmd/stress/cgr-spansstress/cgr-spansstress.go b/cmd/stress/cgr-spansstress/cgr-spansstress.go deleted file mode 100644 index 28420c564..000000000 --- a/cmd/stress/cgr-spansstress/cgr-spansstress.go +++ /dev/null @@ -1,104 +0,0 @@ -/* -Rating system designed to be used in VoIP Carriers World -Copyright (C) 2013 ITsysCOM - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package main - -import ( - "flag" - "log" - "os" - "runtime" - "runtime/pprof" - "time" - - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -var ( - cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") - memprofile = flag.String("memprofile", "", "write memory profile to this file") - runs = flag.Int("runs", 10000, "stress cycle number") -) - -func main() { - flag.Parse() - runtime.GOMAXPROCS(runtime.NumCPU() - 1) - - if *cpuprofile != "" { - f, err := os.Create(*cpuprofile) - if err != nil { - log.Fatal(err) - } - pprof.StartCPUProfile(f) - defer pprof.StopCPUProfile() - } - cd := engine.CallDescriptor{ - TimeStart: time.Date(2013, time.December, 13, 22, 30, 0, 0, time.UTC), - TimeEnd: time.Date(2013, time.December, 13, 22, 31, 0, 0, time.UTC), - CallDuration: 60 * time.Second, - Direction: "*out", - TOR: "call", - Tenant: "185.25.80.254", - Subject: "dan", - Destination: "+4986517174963", - } - ratingDb, err := engine.ConfigureRatingStorage(utils.REDIS, "127.0.0.1", "6379", "10", "", "", utils.MSGPACK) - if err != nil { - log.Fatal("Could not connect to rating store: ", err) - } - defer ratingDb.Close() - engine.SetRatingStorage(ratingDb) - accountDb, err := engine.ConfigureAccountingStorage(utils.REDIS, "127.0.0.1", "6379", "11", "", "", utils.MSGPACK) - if err != nil { - log.Fatal("Could not connect to accounting store: ", err) - } - defer accountDb.Close() - engine.SetAccountingStorage(accountDb) - if err := ratingDb.CacheRating(nil, nil, nil); err != nil { - log.Printf("Cache rating error: %v", err) - return - } - - log.Printf("Runnning %d cycles...", *runs) - var result *engine.CallCost - j := 0 - start := time.Now() - for i := 0; i < *runs; i++ { - result, err = cd.GetCost() - if *memprofile != "" { - runtime.MemProfileRate = 1 - runtime.GC() - f, err := os.Create(*memprofile) - if err != nil { - log.Fatal(err) - } - pprof.WriteHeapProfile(f) - f.Close() - break - } - j = i - } - duration := time.Since(start) - log.Print(result, j, err) - memstats := new(runtime.MemStats) - runtime.ReadMemStats(memstats) - log.Printf("memstats before GC: Kbytes = %d footprint = %d", - memstats.HeapAlloc/1024, memstats.Sys/1024) - log.Printf("Elapsed: %v resulted: %v req/s.", duration, float64(*runs)/duration.Seconds()) -} diff --git a/cmd/stress/cgr-raterstress/cgr-raterstress.py b/data/tester/cgr-tester.py similarity index 100% rename from cmd/stress/cgr-raterstress/cgr-raterstress.py rename to data/tester/cgr-tester.py diff --git a/docs/history.rst b/docs/history.rst index c1f80a65f..ef3a3c12f 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -1,9 +1,9 @@ -Rates history -============= +Rating history +============== Enhances CGRateS with ability to archive rates modifications. -Ability to scale by using server-agents approach. +Large scaling posibility using server-agents approach. In a distributed environment, there will be a single server (which can be backed up using technologies such as Linux-HA) and more agents sending the modifications to be archived. History-Server @@ -13,12 +13,12 @@ Part of the *cgr-engine*. Controlled within *history_server* section of the configuration file. -Stores rates archive in a .git folder, hence making the rating changes available for analysis via any git browser tool (eg: gitg in linux). +Stores rating archive in a .git folder, hence making the changes available for analysis via any git browser tool (eg: gitg in linux). Functionality: -- On startup reads the rates archive out of .git folder and caches the data. -- When receiving rates information from the agents it will recompile the rates cache. +- On startup reads the rating archive out of .git folder and caches the data. +- When receiving rating information from the agents it will recompile the cache. - Based on configured save interval it will dump the rating cache (if changed) into the .git archive. - Archives the following rating data: - Destinations inside *destinations.json* file. @@ -28,7 +28,7 @@ Functionality: History-Agent ------------- -Integrated in the rates loader components. +Integrated in the rating loader components. Part of *cgr-engine* and *cgr-loader*. diff --git a/importer.go b/importer.go index 04b1426c2..6d7c8a6ff 100644 --- a/importer.go +++ b/importer.go @@ -22,4 +22,5 @@ import ( _ "github.com/cgrates/cgrates/cmd/cgr-console" _ "github.com/cgrates/cgrates/cmd/cgr-engine" _ "github.com/cgrates/cgrates/cmd/cgr-loader" + _ "github.com/cgrates/cgrates/cmd/cgr-tester" )