General tests and configs for multiple cdrc imports

This commit is contained in:
DanB
2014-05-25 16:23:05 +02:00
parent dbf6379818
commit 00248b16dc
9 changed files with 242 additions and 34 deletions

View File

@@ -86,7 +86,7 @@ func (self *Cdrc) Run() error {
// Takes the record out of csv and turns it into http form which can be posted
func (self *Cdrc) recordForkCdr(record []string) (*utils.StoredCdr, error) {
storedCdr := &utils.StoredCdr{CdrSource: self.cdrSourceId, ExtraFields: make(map[string]string), Cost: -1}
storedCdr := &utils.StoredCdr{CdrHost: "0.0.0.0", CdrSource: self.cdrSourceId, ExtraFields: make(map[string]string), Cost: -1}
var err error
for cfgFieldName, cfgFieldRSR := range self.cdrFields {
var fieldVal string
@@ -120,15 +120,15 @@ func (self *Cdrc) recordForkCdr(record []string) (*utils.StoredCdr, error) {
storedCdr.Destination = fieldVal
case utils.SETUP_TIME:
if storedCdr.SetupTime, err = utils.ParseTimeDetectLayout(fieldVal); err != nil {
return nil, fmt.Errorf("Cannot parse answer time field, err: %s", err.Error())
return nil, fmt.Errorf("Cannot parse answer time field with value: %s, err: %s", fieldVal, err.Error())
}
case utils.ANSWER_TIME:
if storedCdr.AnswerTime, err = utils.ParseTimeDetectLayout(fieldVal); err != nil {
return nil, fmt.Errorf("Cannot parse answer time field, err: %s", err.Error())
return nil, fmt.Errorf("Cannot parse answer time field with value: %s, err: %s", fieldVal, err.Error())
}
case utils.USAGE:
if storedCdr.Usage, err = utils.ParseDurationWithSecs(fieldVal); err != nil {
return nil, fmt.Errorf("Cannot parse duration field, err: %s", err.Error())
if storedCdr.Usage, err = utils.ParseDurationWithNanosecs(fieldVal); err != nil {
return nil, fmt.Errorf("Cannot parse duration field with value: %s, err: %s", fieldVal, err.Error())
}
default: // Extra fields will not match predefined so they all show up here
storedCdr.ExtraFields[cfgFieldName] = fieldVal

View File

@@ -81,7 +81,7 @@ func startEngine() error {
return errors.New("Cannot find cgr-engine executable")
}
stopEngine()
engine := exec.Command(enginePath, "-cdrs", "-config", cfgPath)
engine := exec.Command(enginePath, "-config", cfgPath)
if err := engine.Start(); err != nil {
return fmt.Errorf("Cannot start cgr-engine: %s", err.Error())
}

View File

@@ -40,7 +40,7 @@ func TestRecordForkCdr(t *testing.T) {
t.Error("Failed to corectly detect missing fields from record")
}
cdrRow = []string{"ignored", "ignored", utils.VOICE, "acc1", "prepaid", "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963",
"2013-02-03 19:50:00", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1"}
"2013-02-03 19:50:00", "2013-02-03 19:54:00", "62000000000", "supplier1", "172.16.1.1"}
rtCdr, err := cdrc.recordForkCdr(cdrRow)
if err != nil {
t.Error("Failed to parse CDR in rated cdr", err)
@@ -49,6 +49,7 @@ func TestRecordForkCdr(t *testing.T) {
CgrId: utils.Sha1(cdrRow[3], time.Date(2013, 2, 3, 19, 50, 0, 0, time.UTC).String()),
TOR: cdrRow[2],
AccId: cdrRow[3],
CdrHost: "0.0.0.0", // Got it over internal interface
CdrSource: cgrConfig.CdrcSourceId,
ReqType: cdrRow[4],
Direction: cdrRow[5],

View File

@@ -2,7 +2,7 @@
# Copyright (C) 2012-2014 ITsysCOM GmbH
[global]
xmlcfg_path = multiplecdrc_fwexport.xml
xmlcfg_path = /usr/share/cgrates/conf/samples/multiplecdrc_fwexport.xml
[rater]
enabled = true # Enable RaterCDRSExportPath service: <true|false>.
@@ -12,7 +12,7 @@ enabled = true # Starts Scheduler service: <true|false>.
[cdrs]
enabled = true # Start the CDR Server service: <true|false>.
mediator = internal # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
mediator = internal # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
[cdre]
export_dir = /tmp/cgrates/cdr/cdre/csv # Path where the exported CDRs will be placed

View File

@@ -4,46 +4,48 @@
<enabled>true</enabled>
<cdrs_address>internal</cdrs_address>
<cdr_type>csv</cdr_type>
<csv_separator>,</csv_separator>
<run_delay>0</run_delay>
<cdr_in_dir>/tmp/cgrates/cdrc2/in</cdr_in_dir>
<cdr_out_dir>/tmp/cgrates/cdrc2/out</cdr_out_dir>
<cdr_source_id>csv2</cdr_source_id>
<fields>
<field id="tor" filter="7" />
<field id="tor" filter="~7:s/^(voice|data)$/*$1/" />
<field id="accid" filter="0" />
<field id="reqtype" filter="^rated" />
<field id="direction" filter="^*out" />
<field id="tenant" filter="^cgrates.org" />
<field id="category" filter="7" />
<field id="category" filter="~7:s/^voice$/call/" />
<field id="account" filter="3" />
<field id="subject" filter="3" />
<field id="destination" filter="5" />
<field id="destination" filter="~5:s/^0([1-9]+)$/+49$1/" />
<field id="setup_time" filter="1" />
<field id="answer_time" filter="1" />
<field id="usage" filter="9" />
<field id="usage" filter="~9:s/^(\d+)$/${1}s/" />
</fields>
</configuration>
<configuration section="cdrc" type="csv" id="CDRC-CSV3">
<enabled>true</enabled>
<cdrs_address>internal</cdrs_address>
<cdr_type>csv</cdr_type>
<csv_separator>;</csv_separator>
<run_delay>0</run_delay>
<cdr_in_dir>/tmp/cgrates/cdrc3/in</cdr_in_dir>
<cdr_out_dir>/tmp/cgrates/cdrc3/out</cdr_out_dir>
<cdr_source_id>csv3</cdr_source_id>
<fields>
<field id="tor" filter="7" />
<field id="accid" filter="0" />
<field id="tor" filter="^*voice" />
<field id="accid" filter="~3:s/^(\d{2})\.(\d{2})\.(\d{4})\s{2}(\d{2}):(\d{2}):(\d{2})$/$1$2$3$4$5$6/" />
<field id="reqtype" filter="^rated" />
<field id="direction" filter="^*out" />
<field id="tenant" filter="^cgrates.org" />
<field id="category" filter="7" />
<field id="account" filter="3" />
<field id="subject" filter="3" />
<field id="destination" filter="5" />
<field id="setup_time" filter="1" />
<field id="answer_time" filter="1" />
<field id="usage" filter="9" />
<field id="category" filter="^call" />
<field id="account" filter="~0:s/^([1-9]+)$/+$1/" />
<field id="subject" filter="~0:s/^([1-9]+)$/+$1/" />
<field id="destination" filter="~1:s/^([1-9]+)$/+$1/" />
<field id="setup_time" filter="4" />
<field id="answer_time" filter="4" />
<field id="usage" filter="~6:s/^(\d+)$/${1}s/" />
</fields>
</configuration>
<configuration section="cdre" type="fwv" id="CDRE-FW1">

View File

@@ -57,7 +57,7 @@ TOPUP10_AC1,*topup_reset,*voice,*out,40,*unlimited,DST_UK_Mobile_BIG5,discounted
actionPlans := `TOPUP10_AT,TOPUP10_AC,ASAP,10
TOPUP10_AT,TOPUP10_AC1,ASAP,10`
actionTriggers := ``
accountActions := `cgrates.org,12345,*out,TOPUP10_AT,`
accountActions := `cgrates.org,12344,*out,TOPUP10_AT,`
derivedCharges := ``
csvr := engine.NewStringCSVReader(ratingDb, acntDb, ',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles,
sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges)
@@ -101,7 +101,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
t.Fatal(err)
}
csvr.WriteToDatabase(false, false)
if acnt, err := acntDb.GetAccount("*out:cgrates.org:12345"); err != nil {
if acnt, err := acntDb.GetAccount("*out:cgrates.org:12344"); err != nil {
t.Error(err)
} else if acnt == nil {
t.Error("No account saved")
@@ -127,7 +127,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
func TestExecuteActions(t *testing.T) {
scheduler.NewScheduler().LoadActionTimings(acntDb)
time.Sleep(time.Millisecond) // Give time to scheduler to topup the account
if acnt, err := acntDb.GetAccount("*out:cgrates.org:12345"); err != nil {
if acnt, err := acntDb.GetAccount("*out:cgrates.org:12344"); err != nil {
t.Error(err)
} else if len(acnt.BalanceMap) != 2 {
t.Error("Account does not have enough balances: ", acnt.BalanceMap)
@@ -143,8 +143,8 @@ func TestDebit(t *testing.T) {
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "12345",
Account: "12345",
Subject: "12344",
Account: "12344",
Destination: "447956933443",
TimeStart: time.Date(2014, 3, 4, 6, 0, 0, 0, time.UTC),
TimeEnd: time.Date(2014, 3, 4, 6, 0, 10, 0, time.UTC),
@@ -154,7 +154,7 @@ func TestDebit(t *testing.T) {
} else if cc.Cost != 0.01 {
t.Error("Wrong cost returned: ", cc.Cost)
}
acnt, err := acntDb.GetAccount("*out:cgrates.org:12345")
acnt, err := acntDb.GetAccount("*out:cgrates.org:12344")
if err != nil {
t.Error(err)
}

View File

@@ -55,7 +55,7 @@ RP_UK,DR_UK_Mobile_BIG5,ALWAYS,10`
actions := `TOPUP10_AC1,*topup_reset,*voice,*out,40,*unlimited,DST_UK_Mobile_BIG5,discounted_minutes,10,,,10`
actionPlans := `TOPUP10_AT,TOPUP10_AC1,ASAP,10`
actionTriggers := ``
accountActions := `cgrates.org,12345,*out,TOPUP10_AT,`
accountActions := `cgrates.org,12346,*out,TOPUP10_AT,`
derivedCharges := ``
csvr := engine.NewStringCSVReader(ratingDb3, acntDb3, ',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles,
sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges)
@@ -99,7 +99,7 @@ RP_UK,DR_UK_Mobile_BIG5,ALWAYS,10`
t.Fatal(err)
}
csvr.WriteToDatabase(false, false)
if acnt, err := acntDb3.GetAccount("*out:cgrates.org:12345"); err != nil {
if acnt, err := acntDb3.GetAccount("*out:cgrates.org:12346"); err != nil {
t.Error(err)
} else if acnt == nil {
t.Error("No account saved")
@@ -123,7 +123,7 @@ RP_UK,DR_UK_Mobile_BIG5,ALWAYS,10`
func TestExecuteActions3(t *testing.T) {
scheduler.NewScheduler().LoadActionTimings(acntDb3)
time.Sleep(time.Millisecond) // Give time to scheduler to topup the account
if acnt, err := acntDb3.GetAccount("*out:cgrates.org:12345"); err != nil {
if acnt, err := acntDb3.GetAccount("*out:cgrates.org:12346"); err != nil {
t.Error(err)
} else if len(acnt.BalanceMap) != 1 {
t.Error("Account does not have enough balances: ", acnt.BalanceMap)
@@ -137,8 +137,8 @@ func TestDebit3(t *testing.T) {
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "12345",
Account: "12345",
Subject: "12346",
Account: "12346",
Destination: "447956933443",
TimeStart: time.Date(2014, 3, 4, 6, 0, 0, 0, time.UTC),
TimeEnd: time.Date(2014, 3, 4, 6, 0, 10, 0, time.UTC),
@@ -148,7 +148,7 @@ func TestDebit3(t *testing.T) {
} else if cc.Cost != 0.01 {
t.Error("Wrong cost returned: ", cc.Cost)
}
acnt, err := acntDb3.GetAccount("*out:cgrates.org:12345")
acnt, err := acntDb3.GetAccount("*out:cgrates.org:12346")
if err != nil {
t.Error(err)
}

View File

@@ -0,0 +1,191 @@
/*
Real-time Charging System for Telecom & ISP environments
Copyright (C) 2012-2014 ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package general_tests
import (
"errors"
"flag"
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"io/ioutil"
"net/rpc"
"net/rpc/jsonrpc"
"os"
"os/exec"
"path"
"testing"
"time"
)
var cfgPath string
var cfg *config.CGRConfig
var rater *rpc.Client
var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args
var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
var storDbType = flag.String("stordb_type", "mysql", "The type of the storDb database <mysql>")
var waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache")
func init() {
cfgPath = path.Join(*dataDir, "conf", "samples", "multiplecdrc_fwexport.cfg")
cfg, _ = config.NewCGRConfigFromFile(&cfgPath)
}
func startEngine() error {
enginePath, err := exec.LookPath("cgr-engine")
if err != nil {
return errors.New("Cannot find cgr-engine executable")
}
stopEngine()
engine := exec.Command(enginePath, "-config", cfgPath)
if err := engine.Start(); err != nil {
return fmt.Errorf("Cannot start cgr-engine: %s", err.Error())
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time to rater to fire up
return nil
}
func stopEngine() error {
exec.Command("pkill", "cgr-engine").Run() // Just to make sure another one is not running, bit brutal maybe we can fine tune it
return nil
}
func TestEmptyTables(t *testing.T) {
if !*testLocal {
return
}
if *storDbType != utils.MYSQL {
t.Fatal("Unsupported storDbType")
}
var mysql *engine.MySQLStorage
if d, err := engine.NewMySQLStorage(cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass); err != nil {
t.Fatal("Error on opening database connection: ", err)
} else {
mysql = d.(*engine.MySQLStorage)
}
if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", *storDbType, engine.CREATE_CDRS_TABLES_SQL)); err != nil {
t.Fatal("Error on mysql creation: ", err.Error())
return // No point in going further
}
for _, tbl := range []string{utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA} {
if _, err := mysql.Db.Query(fmt.Sprintf("SELECT 1 from %s", tbl)); err != nil {
t.Fatal(err.Error())
}
}
}
func TestCreateCdrDirs(t *testing.T) {
if !*testLocal {
return
}
for _, cdrcDir := range []string{cfg.CdrcCdrInDir, cfg.CdrcCdrOutDir,
cfg.XmlCfgDocument.GetCdrcCfgs("CDRC-CSV2")["CDRC-CSV2"].CdrInDir, cfg.XmlCfgDocument.GetCdrcCfgs("CDRC-CSV2")["CDRC-CSV2"].CdrOutDir,
cfg.XmlCfgDocument.GetCdrcCfgs("CDRC-CSV3")["CDRC-CSV3"].CdrInDir, cfg.XmlCfgDocument.GetCdrcCfgs("CDRC-CSV3")["CDRC-CSV3"].CdrOutDir} {
if err := os.RemoveAll(cdrcDir); err != nil {
t.Fatal("Error removing folder: ", cdrcDir, err)
}
if err := os.MkdirAll(cdrcDir, 0755); err != nil {
t.Fatal("Error creating folder: ", cdrcDir, err)
}
}
}
// Connect rpc client to rater
func TestRpcConn(t *testing.T) {
if !*testLocal {
return
}
startEngine()
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
var err error
rater, err = jsonrpc.Dial("tcp", cfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
}
// Test here LoadTariffPlanFromFolder
func TestApierLoadTariffPlanFromFolder(t *testing.T) {
if !*testLocal {
return
}
reply := ""
// Simple test that command is executed without errors
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "prepaid1centpsec")}
if err := rater.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil {
t.Error("Got error on ApierV1.LoadTariffPlanFromFolder: ", err.Error())
} else if reply != "OK" {
t.Error("Calling ApierV1.LoadTariffPlanFromFolder got reply: ", reply)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups
}
// The default scenario, out of cdrc defined in .cfg file
func TestHandleCdr1File(t *testing.T) {
if !*testLocal {
return
}
var fileContent1 = `dbafe9c8614c785a65aabd116dd3959c3c56f7f6,default,*voice,dsafdsaf,rated,*out,cgrates.org,call,1001,1001,+4986517174963,2013-11-07 08:42:25 +0000 UTC,2013-11-07 08:42:26 +0000 UTC,10000000000,1.0100,val_extra3,"",val_extra1
dbafe9c8614c785a65aabd116dd3959c3c56f7f7,default,*voice,dsafdsag,rated,*out,cgrates.org,call,1001,1001,+4986517174964,2013-11-07 09:42:25 +0000 UTC,2013-11-07 09:42:26 +0000 UTC,20000000000,1.0100,val_extra3,"",val_extra1
`
fileName := "file1.csv"
tmpFilePath := path.Join("/tmp", fileName)
if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent1), 0644); err != nil {
t.Fatal(err.Error)
}
if err := os.Rename(tmpFilePath, path.Join(cfg.CdrcCdrInDir, fileName)); err != nil {
t.Fatal("Error moving file to processing directory: ", err)
}
}
// Scenario out of first .xml config
func TestHandleCdr2File(t *testing.T) {
if !*testLocal {
return
}
var fileContent = `616350843,20131022145011,20131022172857,3656,1001,,,data,mo,640113,0.000000,1.222656,1.222660
616199016,20131022154924,20131022154955,3656,1001,086517174963,,voice,mo,31,0.000000,0.000000,0.000000`
fileName := "file2.csv"
tmpFilePath := path.Join("/tmp", fileName)
if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent), 0644); err != nil {
t.Fatal(err.Error)
}
if err := os.Rename(tmpFilePath, path.Join(cfg.XmlCfgDocument.GetCdrcCfgs("CDRC-CSV2")["CDRC-CSV2"].CdrInDir, fileName)); err != nil {
t.Fatal("Error moving file to processing directory: ", err)
}
}
// Scenario out of second .xml config
func TestHandleCdr3File(t *testing.T) {
if !*testLocal {
return
}
var fileContent = `4986517174960;4986517174963;Sample Mobile;08.04.2014 22:14:29;08.04.2014 22:14:29;1;193;Offeak;0,072728833;31619
4986517174960;4986517174964;National;08.04.2014 20:34:55;08.04.2014 20:34:55;1;21;Offeak;0,0079135;311`
fileName := "file3.csv"
tmpFilePath := path.Join("/tmp", fileName)
if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent), 0644); err != nil {
t.Fatal(err.Error)
}
if err := os.Rename(tmpFilePath, path.Join(cfg.XmlCfgDocument.GetCdrcCfgs("CDRC-CSV3")["CDRC-CSV3"].CdrInDir, fileName)); err != nil {
t.Fatal("Error moving file to processing directory: ", err)
}
}

View File

@@ -97,3 +97,17 @@ func TestRSRParseStatic(t *testing.T) {
t.Errorf("Expected: %s, received: %s", "static_hdrvalue", parsed)
}
}
func TestConvertDurToSecs(t *testing.T) {
expectRSRField := &RSRField{Id: "9", RSRules: []*ReSearchReplace{
&ReSearchReplace{regexp.MustCompile(`^(\d+)$`), "${1}s"}}}
rsrField, err := NewRSRField(`~9:s/^(\d+)$/${1}s/`)
if err != nil {
t.Error(err)
} else if !reflect.DeepEqual(rsrField, expectRSRField) {
t.Errorf("Expecting: %v, received: %v", expectRSRField, rsrField)
}
if parsedVal := rsrField.ParseValue("640113"); parsedVal != "640113s" {
t.Errorf("Expecting: 640113s, received: %s", parsedVal)
}
}