mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Engine to initialize database only when needed, improved db.Flush(), more CdrsV2 local tests
This commit is contained in:
@@ -20,6 +20,7 @@ package v2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/cgrates/cgrates/apier/v1"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
@@ -55,3 +56,8 @@ func (apier *ApierV2) CountCdrs(attrs utils.RpcCdrsFilter, reply *int64) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Receive CDRs via RPC methods, not included with APIer because it has way less dependencies and can be standalone
|
||||
type CdrsV2 struct {
|
||||
v1.CDRSV1
|
||||
}
|
||||
|
||||
@@ -25,8 +25,10 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"net/rpc"
|
||||
"net/rpc/jsonrpc"
|
||||
"os/exec"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args
|
||||
@@ -37,6 +39,8 @@ var cdrsCfgPath string
|
||||
var cdrsCfg *config.CGRConfig
|
||||
var cdrsRpc *rpc.Client
|
||||
|
||||
var cmdEngineCdrsMysql *exec.Cmd
|
||||
|
||||
func TestInitConfig(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
@@ -49,7 +53,7 @@ func TestInitConfig(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestV2CdrsInitDataDb(t *testing.T) {
|
||||
func TestV2CdrsMysqlInitDataDb(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
@@ -58,17 +62,28 @@ func TestV2CdrsInitDataDb(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestV2CdrsStartEngine(t *testing.T) {
|
||||
// InitDb so we can rely on count
|
||||
func TestV2CdrsMysqlInitCdrDb(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
if err := engine.StartEngine(cdrsCfgPath, *waitRater); err != nil {
|
||||
if err := engine.InitCdrDb(cdrsCfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestV2CdrsMysqlStartEngine(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
var err error
|
||||
if cmdEngineCdrsMysql, err = engine.StartEngine(cdrsCfgPath, *waitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Connect rpc client to rater
|
||||
func TestV2CdrsRpcConn(t *testing.T) {
|
||||
func TestV2CdrsMysqlRpcConn(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
@@ -79,7 +94,39 @@ func TestV2CdrsRpcConn(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestV2CdrsGetCdrs(t *testing.T) {
|
||||
// Insert some CDRs
|
||||
func TestV2CdrsMysqlProcessCdr(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
var reply string
|
||||
cdrs := []*utils.StoredCdr{
|
||||
&utils.StoredCdr{CgrId: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf",
|
||||
CdrHost: "192.168.1.1", CdrSource: "test", ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "1002",
|
||||
SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), MediationRunId: utils.DEFAULT_RUNID,
|
||||
Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dans",
|
||||
},
|
||||
&utils.StoredCdr{CgrId: utils.Sha1("abcdeftg", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf",
|
||||
CdrHost: "192.168.1.1", CdrSource: "test", ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1002", Subject: "1002", Destination: "1002",
|
||||
SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), MediationRunId: utils.DEFAULT_RUNID,
|
||||
Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dans",
|
||||
},
|
||||
&utils.StoredCdr{CgrId: utils.Sha1("aererfddf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf",
|
||||
CdrHost: "192.168.1.1", CdrSource: "test", ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1003", Subject: "1003", Destination: "1002",
|
||||
SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), MediationRunId: utils.DEFAULT_RUNID,
|
||||
Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dans",
|
||||
},
|
||||
}
|
||||
for _, cdr := range cdrs {
|
||||
if err := cdrsRpc.Call("CdrsV2.ProcessCdr", cdr, &reply); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error())
|
||||
} else if reply != utils.OK {
|
||||
t.Error("Unexpected reply received: ", reply)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestV2CdrsMysqlGetCdrs(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
@@ -87,12 +134,12 @@ func TestV2CdrsGetCdrs(t *testing.T) {
|
||||
req := utils.AttrGetCdrs{}
|
||||
if err := cdrsRpc.Call("ApierV2.GetCdrs", req, &reply); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error())
|
||||
} /*else if len(reply) != 2 {
|
||||
} else if len(reply) != 3 {
|
||||
t.Error("Unexpected number of CDRs returned: ", len(reply))
|
||||
}*/
|
||||
}
|
||||
}
|
||||
|
||||
func TestV2CdrsCountCdrs(t *testing.T) {
|
||||
func TestV2CdrsMysqlCountCdrs(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
@@ -100,16 +147,16 @@ func TestV2CdrsCountCdrs(t *testing.T) {
|
||||
req := utils.AttrGetCdrs{}
|
||||
if err := cdrsRpc.Call("ApierV2.CountCdrs", req, &reply); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error())
|
||||
} /*else if len(reply) != 2 {
|
||||
t.Error("Unexpected number of CDRs returned: ", len(reply))
|
||||
}*/
|
||||
} else if reply != 3 {
|
||||
t.Error("Unexpected number of CDRs returned: ", reply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStopEngine(t *testing.T) {
|
||||
func TestV2CdrsMysqlKillEngine(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
if err := engine.StopEngine(*waitRater); err != nil {
|
||||
if err := engine.KillEngine(*waitRater); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,13 +24,16 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"net/rpc"
|
||||
"net/rpc/jsonrpc"
|
||||
"os/exec"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var cdrsPsqlCfgPath string
|
||||
var cdrsPsqlCfg *config.CGRConfig
|
||||
var cdrsPsqlRpc *rpc.Client
|
||||
var cmdEngineCdrPsql *exec.Cmd
|
||||
|
||||
func TestV2CdrsPsqlInitConfig(t *testing.T) {
|
||||
if !*testLocal {
|
||||
@@ -53,11 +56,22 @@ func TestV2CdrsPsqlInitDataDb(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// InitDb so we can rely on count
|
||||
func TestV2CdrsPsqlInitCdrDb(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
if err := engine.InitCdrDb(cdrsPsqlCfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestV2CdrsPsqlStartEngine(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
if err := engine.StartEngine(cdrsPsqlCfgPath, *waitRater); err != nil {
|
||||
var err error
|
||||
if cmdEngineCdrPsql, err = engine.StartEngine(cdrsPsqlCfgPath, *waitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
@@ -74,6 +88,38 @@ func TestV2CdrsPsqlPsqlRpcConn(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Insert some CDRs
|
||||
func TestV2CdrsPsqlProcessCdr(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
var reply string
|
||||
cdrs := []*utils.StoredCdr{
|
||||
&utils.StoredCdr{CgrId: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf",
|
||||
CdrHost: "192.168.1.1", CdrSource: "test", ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "1002",
|
||||
SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), MediationRunId: utils.DEFAULT_RUNID,
|
||||
Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dans",
|
||||
},
|
||||
&utils.StoredCdr{CgrId: utils.Sha1("abcdeftg", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf",
|
||||
CdrHost: "192.168.1.1", CdrSource: "test", ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1002", Subject: "1002", Destination: "1002",
|
||||
SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), MediationRunId: utils.DEFAULT_RUNID,
|
||||
Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dans",
|
||||
},
|
||||
&utils.StoredCdr{CgrId: utils.Sha1("aererfddf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf",
|
||||
CdrHost: "192.168.1.1", CdrSource: "test", ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1003", Subject: "1003", Destination: "1002",
|
||||
SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), MediationRunId: utils.DEFAULT_RUNID,
|
||||
Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dans",
|
||||
},
|
||||
}
|
||||
for _, cdr := range cdrs {
|
||||
if err := cdrsPsqlRpc.Call("CdrsV2.ProcessCdr", cdr, &reply); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error())
|
||||
} else if reply != utils.OK {
|
||||
t.Error("Unexpected reply received: ", reply)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestV2CdrsPsqlGetCdrs(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
@@ -82,9 +128,9 @@ func TestV2CdrsPsqlGetCdrs(t *testing.T) {
|
||||
req := utils.AttrGetCdrs{}
|
||||
if err := cdrsPsqlRpc.Call("ApierV2.GetCdrs", req, &reply); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error())
|
||||
} /*else if len(reply) != 2 {
|
||||
} else if len(reply) != 3 {
|
||||
t.Error("Unexpected number of CDRs returned: ", len(reply))
|
||||
}*/
|
||||
}
|
||||
}
|
||||
|
||||
func TestV2CdrsPsqlCountCdrs(t *testing.T) {
|
||||
@@ -95,16 +141,16 @@ func TestV2CdrsPsqlCountCdrs(t *testing.T) {
|
||||
req := utils.AttrGetCdrs{}
|
||||
if err := cdrsPsqlRpc.Call("ApierV2.CountCdrs", req, &reply); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error())
|
||||
} /*else if len(reply) != 2 {
|
||||
t.Error("Unexpected number of CDRs returned: ", len(reply))
|
||||
}*/
|
||||
} else if reply != 3 {
|
||||
t.Error("Unexpected number of CDRs returned: ", reply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestV2CdrsPsqlStopEngine(t *testing.T) {
|
||||
func TestV2CdrsPsqlKillEngine(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
if err := engine.StopEngine(*waitRater); err != nil {
|
||||
if err := engine.KillEngine(*waitRater); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -208,7 +208,9 @@ func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage, mediChan, d
|
||||
cdrServer = engine.NewCdrS(cdrDb, medi, cdrStats, cfg)
|
||||
cdrServer.RegisterHanlersToServer(server)
|
||||
engine.Logger.Info("Registering CDRS RPC service.")
|
||||
server.RpcRegister(&v1.CDRSV1{CdrSrv: cdrServer})
|
||||
cdrSrv := v1.CDRSV1{CdrSrv: cdrServer}
|
||||
server.RpcRegister(&cdrSrv)
|
||||
server.RpcRegister(&v2.CdrsV2{CDRSV1: cdrSrv})
|
||||
responder.CdrSrv = cdrServer // Make the cdrserver available for internal communication
|
||||
close(doneChan)
|
||||
}
|
||||
@@ -343,38 +345,41 @@ func main() {
|
||||
var logDb engine.LogStorage
|
||||
var loadDb engine.LoadStorage
|
||||
var cdrDb engine.CdrStorage
|
||||
ratingDb, err = engine.ConfigureRatingStorage(cfg.RatingDBType, cfg.RatingDBHost, cfg.RatingDBPort,
|
||||
cfg.RatingDBName, cfg.RatingDBUser, cfg.RatingDBPass, cfg.DBDataEncoding)
|
||||
if err != nil { // Cannot configure getter database, show stopper
|
||||
engine.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
defer ratingDb.Close()
|
||||
engine.SetRatingStorage(ratingDb)
|
||||
accountDb, err = engine.ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort,
|
||||
cfg.AccountDBName, cfg.AccountDBUser, cfg.AccountDBPass, cfg.DBDataEncoding)
|
||||
if err != nil { // Cannot configure getter database, show stopper
|
||||
engine.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
defer accountDb.Close()
|
||||
engine.SetAccountingStorage(accountDb)
|
||||
|
||||
if cfg.StorDBType == SAME {
|
||||
logDb = ratingDb.(engine.LogStorage)
|
||||
} else {
|
||||
logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort,
|
||||
cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding, cfg.StorDBMaxOpenConns, cfg.StorDBMaxIdleConns)
|
||||
if err != nil { // Cannot configure logger database, show stopper
|
||||
engine.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err))
|
||||
if cfg.RaterEnabled || cfg.SchedulerEnabled { // Only connect to dataDb if required
|
||||
ratingDb, err = engine.ConfigureRatingStorage(cfg.RatingDBType, cfg.RatingDBHost, cfg.RatingDBPort,
|
||||
cfg.RatingDBName, cfg.RatingDBUser, cfg.RatingDBPass, cfg.DBDataEncoding)
|
||||
if err != nil { // Cannot configure getter database, show stopper
|
||||
engine.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
defer ratingDb.Close()
|
||||
engine.SetRatingStorage(ratingDb)
|
||||
accountDb, err = engine.ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort,
|
||||
cfg.AccountDBName, cfg.AccountDBUser, cfg.AccountDBPass, cfg.DBDataEncoding)
|
||||
if err != nil { // Cannot configure getter database, show stopper
|
||||
engine.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
defer accountDb.Close()
|
||||
engine.SetAccountingStorage(accountDb)
|
||||
}
|
||||
if cfg.RaterEnabled || cfg.CDRSEnabled || cfg.MediatorEnabled { // Only connect to storDb if necessary
|
||||
if cfg.StorDBType == SAME {
|
||||
logDb = ratingDb.(engine.LogStorage)
|
||||
} else {
|
||||
logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort,
|
||||
cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding, cfg.StorDBMaxOpenConns, cfg.StorDBMaxIdleConns)
|
||||
if err != nil { // Cannot configure logger database, show stopper
|
||||
engine.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
}
|
||||
defer logDb.Close()
|
||||
engine.SetStorageLogger(logDb)
|
||||
// loadDb,cdrDb and logDb are all mapped on the same stordb storage
|
||||
loadDb = logDb.(engine.LoadStorage)
|
||||
cdrDb = logDb.(engine.CdrStorage)
|
||||
}
|
||||
defer logDb.Close()
|
||||
engine.SetStorageLogger(logDb)
|
||||
// loadDb,cdrDb and logDb are all mapped on the same stordb storage
|
||||
loadDb = logDb.(engine.LoadStorage)
|
||||
cdrDb = logDb.(engine.CdrStorage)
|
||||
|
||||
engine.SetRoundingDecimals(cfg.RoundingDecimals)
|
||||
if cfg.SMDebitInterval > 0 {
|
||||
|
||||
@@ -21,6 +21,7 @@ package engine
|
||||
import (
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"os/exec"
|
||||
"path"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -35,30 +36,41 @@ func InitDataDb(cfg *config.CGRConfig) error {
|
||||
return err
|
||||
}
|
||||
for _, db := range []Storage{ratingDb, accountDb} {
|
||||
if err := db.Flush(); err != nil {
|
||||
if err := db.Flush(""); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func StartEngine(cfgPath string, waitEngine int) error {
|
||||
enginePath, err := exec.LookPath("cgr-engine")
|
||||
func InitCdrDb(cfg *config.CGRConfig) error {
|
||||
storDb, err := ConfigureLoadStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding,
|
||||
cfg.StorDBMaxOpenConns, cfg.StorDBMaxIdleConns)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := StopEngine(waitEngine); err != nil {
|
||||
if err := storDb.Flush(path.Join(cfg.DataFolderPath, "storage", cfg.StorDBType)); err != nil {
|
||||
return err
|
||||
}
|
||||
engine := exec.Command(enginePath, "-config", cfgPath)
|
||||
if err := engine.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
time.Sleep(time.Duration(waitEngine) * time.Millisecond) // Give time to rater to fire up
|
||||
return nil
|
||||
}
|
||||
|
||||
func StopEngine(waitEngine int) error {
|
||||
// Return reference towards the command started so we can stop it if necessary
|
||||
func StartEngine(cfgPath string, waitEngine int) (*exec.Cmd, error) {
|
||||
enginePath, err := exec.LookPath("cgr-engine")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
KillEngine(waitEngine)
|
||||
engine := exec.Command(enginePath, "-config", cfgPath)
|
||||
if err := engine.Start(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
time.Sleep(time.Duration(waitEngine) * time.Millisecond) // Give time to rater to fire up
|
||||
return engine, nil
|
||||
}
|
||||
|
||||
func KillEngine(waitEngine int) error {
|
||||
if err := exec.Command("pkill", "cgr-engine").Run(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -186,7 +186,7 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) {
|
||||
return errors.New("No database connection!")
|
||||
}
|
||||
if flush {
|
||||
dataStorage.Flush()
|
||||
dataStorage.Flush("")
|
||||
}
|
||||
if verbose {
|
||||
log.Print("Destinations:")
|
||||
|
||||
@@ -134,7 +134,7 @@ func (dbr *DbReader) ShowStatistics() {
|
||||
func (dbr *DbReader) WriteToDatabase(flush, verbose bool) (err error) {
|
||||
storage := dbr.dataDb
|
||||
if flush {
|
||||
storage.Flush()
|
||||
storage.Flush("")
|
||||
}
|
||||
if verbose {
|
||||
log.Print("Destinations")
|
||||
|
||||
@@ -82,7 +82,7 @@ func TestConnDataDbs(t *testing.T) {
|
||||
t.Fatal("Error on ratingDb connection: ", err.Error())
|
||||
}
|
||||
for _, db := range []Storage{ratingDbCsv, ratingDbStor, ratingDbApier, accountDbCsv, accountDbStor, accountDbApier} {
|
||||
if err = db.Flush(); err != nil {
|
||||
if err = db.Flush(""); err != nil {
|
||||
t.Fatal("Error when flushing datadb")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ func TestMediInitRatingDb(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal("Cannot connect to dataDb", err)
|
||||
}
|
||||
if err := ratingDb.Flush(); err != nil {
|
||||
if err := ratingDb.Flush(""); err != nil {
|
||||
t.Fatal("Cannot reset dataDb", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,7 +63,7 @@ const (
|
||||
|
||||
type Storage interface {
|
||||
Close()
|
||||
Flush() error
|
||||
Flush(string) error
|
||||
GetKeysForPrefix(string) ([]string, error)
|
||||
}
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ func NewMapStorageJson() (*MapStorage, error) {
|
||||
|
||||
func (ms *MapStorage) Close() {}
|
||||
|
||||
func (ms *MapStorage) Flush() error {
|
||||
func (ms *MapStorage) Flush(ignore string) error {
|
||||
ms.dict = make(map[string][]byte)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -67,7 +67,7 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) ([]string, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) Flush() (err error) {
|
||||
func (ms *MongoStorage) Flush(ignore string) (err error) {
|
||||
err = ms.db.C("ratingprofiles").DropCollection()
|
||||
if err != nil {
|
||||
return
|
||||
|
||||
@@ -24,9 +24,7 @@ import (
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
|
||||
"github.com/jinzhu/gorm"
|
||||
)
|
||||
|
||||
@@ -51,10 +49,9 @@ type MySQLStorage struct {
|
||||
*SQLStorage
|
||||
}
|
||||
|
||||
func (self *MySQLStorage) Flush() (err error) {
|
||||
cfg := config.CgrConfig()
|
||||
func (self *MySQLStorage) Flush(scriptsPath string) (err error) {
|
||||
for _, scriptName := range []string{CREATE_CDRS_TABLES_SQL, CREATE_TARIFFPLAN_TABLES_SQL} {
|
||||
if err := self.CreateTablesFromScript(path.Join(cfg.DataFolderPath, "storage", utils.MYSQL, scriptName)); err != nil {
|
||||
if err := self.CreateTablesFromScript(path.Join(scriptsPath, scriptName)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,6 @@ import (
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
|
||||
_ "github.com/bmizerany/pq"
|
||||
@@ -52,10 +51,9 @@ func NewPostgresStorage(host, port, name, user, password string, maxConn, maxIdl
|
||||
return &PostgresStorage{&SQLStorage{Db: db.DB(), db: db}}, nil
|
||||
}
|
||||
|
||||
func (self *PostgresStorage) Flush() (err error) {
|
||||
cfg := config.CgrConfig()
|
||||
func (self *PostgresStorage) Flush(scriptsPath string) (err error) {
|
||||
for _, scriptName := range []string{CREATE_CDRS_TABLES_SQL, CREATE_TARIFFPLAN_TABLES_SQL} {
|
||||
if err := self.CreateTablesFromScript(path.Join(cfg.DataFolderPath, "storage", utils.POSTGRES, scriptName)); err != nil {
|
||||
if err := self.CreateTablesFromScript(path.Join(scriptsPath, scriptName)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ func (rs *RedisStorage) Close() {
|
||||
//rs.db.Quit()
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) Flush() (err error) {
|
||||
func (rs *RedisStorage) Flush(ignore string) (err error) {
|
||||
err = rs.db.Flush(false)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ func TestFlush(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
if err := rds.Flush(); err != nil {
|
||||
if err := rds.Flush(""); err != nil {
|
||||
t.Error("Failed to Flush redis database", err.Error())
|
||||
}
|
||||
rds.CacheAccounting(nil, nil, nil, nil)
|
||||
|
||||
@@ -44,7 +44,7 @@ func (self *SQLStorage) Close() {
|
||||
self.db.Close()
|
||||
}
|
||||
|
||||
func (self *SQLStorage) Flush() (err error) {
|
||||
func (self *SQLStorage) Flush(placeholder string) (err error) {
|
||||
return errors.New(utils.ERR_NOT_IMPLEMENTED)
|
||||
}
|
||||
|
||||
|
||||
@@ -127,7 +127,7 @@ func TestTutLclLoadTariffPlanFromFolder(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestShutdown (t *testing.T) {
|
||||
func TestShutdown(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user