Completing the implementation of the CDR client with tests

This commit is contained in:
DanB
2013-12-22 18:35:16 +01:00
parent c38179c01a
commit 8e95225b99
22 changed files with 351 additions and 228 deletions

View File

@@ -34,12 +34,12 @@ const (
)
type ApierV1 struct {
StorDb engine.LoadStorage
RatingDb engine.RatingStorage
AccountDb engine.AccountingStorage
CdrDb engine.CdrStorage
Sched *scheduler.Scheduler
Config *config.CGRConfig
StorDb engine.LoadStorage
RatingDb engine.RatingStorage
AccountDb engine.AccountingStorage
CdrDb engine.CdrStorage
Sched *scheduler.Scheduler
Config *config.CGRConfig
}
func (self *ApierV1) GetDestination(dstId string, reply *engine.Destination) error {

View File

@@ -54,6 +54,7 @@ var rater *rpc.Client
var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args
var dataDir = flag.String("data_dir", "/usr/share/cgrates/data", "CGR data dir path here")
var storDbType = flag.String("stordb_type", "mysql", "The type of the storDb database <mysql>")
var waitRater = flag.Int("wait_rater", 300, "Number of miliseconds to wait for rater to start and cache")
func init() {
@@ -66,6 +67,9 @@ func TestCreateTables(t *testing.T) {
if !*testLocal {
return
}
if *storDbType != utils.MYSQL {
t.Fatal("Unsupported storDbType")
}
var mysql *engine.MySQLStorage
if d, err := engine.NewMySQLStorage(cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass); err != nil {
t.Fatal("Error on opening database connection: ", err)
@@ -73,7 +77,7 @@ func TestCreateTables(t *testing.T) {
mysql = d.(*engine.MySQLStorage)
}
for _, scriptName := range []string{engine.CREATE_CDRS_TABLES_SQL, engine.CREATE_COSTDETAILS_TABLES_SQL, engine.CREATE_MEDIATOR_TABLES_SQL, engine.CREATE_TARIFFPLAN_TABLES_SQL} {
if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", "mysql", scriptName)); err != nil {
if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", *stprDbType, scriptName)); err != nil {
t.Fatal("Error on mysql creation: ", err.Error())
return // No point in going further
}
@@ -93,7 +97,7 @@ func TestInitDataDb(t *testing.T) {
if err != nil {
t.Fatal("Cannot connect to dataDb", err)
}
accountDb, err := engine.ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort, cfg.AccountDBName,
accountDb, err := engine.ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort, cfg.AccountDBName,
cfg.AccountDBUser, cfg.AccountDBPass, cfg.DBDataEncoding)
if err != nil {
t.Fatal("Cannot connect to dataDb", err)
@@ -804,22 +808,22 @@ func TestApierGetRatingPlan(t *testing.T) {
if reply.Id != rplnId {
t.Error("Unexpected id received", reply.Id)
}
if len(reply.Timings)!= 1 || len(reply.Ratings) != 1 {
if len(reply.Timings) != 1 || len(reply.Ratings) != 1 {
t.Error("Unexpected number of items received")
}
/*
riTiming := &engine.RITiming{StartTime: "00:00:00"}
for _, tm := range reply.Timings { // We only get one loop
if !reflect.DeepEqual(tm, riTiming) {
t.Errorf("Unexpected timings value: %v, expecting: %v", tm, riTiming)
riTiming := &engine.RITiming{StartTime: "00:00:00"}
for _, tm := range reply.Timings { // We only get one loop
if !reflect.DeepEqual(tm, riTiming) {
t.Errorf("Unexpected timings value: %v, expecting: %v", tm, riTiming)
}
}
}
*/
riRate := &engine.RIRate{ConnectFee:0, RoundingMethod: "*up", RoundingDecimals: 0, Rates: []*engine.Rate{
&engine.Rate{GroupIntervalStart: 0, Value: 0, RateIncrement: time.Duration(60)*time.Second, RateUnit: time.Duration(60)*time.Second},
}}
riRate := &engine.RIRate{ConnectFee: 0, RoundingMethod: "*up", RoundingDecimals: 0, Rates: []*engine.Rate{
&engine.Rate{GroupIntervalStart: 0, Value: 0, RateIncrement: time.Duration(60) * time.Second, RateUnit: time.Duration(60) * time.Second},
}}
for _, rating := range reply.Ratings {
if !reflect.DeepEqual( rating, riRate ) {
if !reflect.DeepEqual(rating, riRate) {
t.Errorf("Unexpected riRate received: %v", rating)
}
}
@@ -858,7 +862,7 @@ func TestApierExecuteAction(t *testing.T) {
}
reply := ""
// Add balance to a previously known account
attrs := AttrExecuteAction{ Direction: "*out", Tenant: "cgrates.org", Account: "dan2", ActionsId: "PREPAID_10"}
attrs := AttrExecuteAction{Direction: "*out", Tenant: "cgrates.org", Account: "dan2", ActionsId: "PREPAID_10"}
if err := rater.Call("ApierV1.ExecuteAction", attrs, &reply); err != nil {
t.Error("Got error on ApierV1.ExecuteAction: ", err.Error())
} else if reply != "OK" {
@@ -866,13 +870,12 @@ func TestApierExecuteAction(t *testing.T) {
}
reply2 := ""
// Add balance to an account which does n exist
attrs = AttrExecuteAction{ Direction: "*out", Tenant: "cgrates.org", Account: "dan2", ActionsId: "DUMMY_ACTION"}
attrs = AttrExecuteAction{Direction: "*out", Tenant: "cgrates.org", Account: "dan2", ActionsId: "DUMMY_ACTION"}
if err := rater.Call("ApierV1.ExecuteAction", attrs, &reply2); err == nil || reply2 == "OK" {
t.Error("Expecting error on ApierV1.ExecuteAction.", err, reply2)
}
}
// Test here AddTriggeredAction
func TestApierAddTriggeredAction(t *testing.T) {
if !*testLocal {
@@ -880,7 +883,7 @@ func TestApierAddTriggeredAction(t *testing.T) {
}
reply := ""
// Add balance to a previously known account
attrs := &AttrAddActionTrigger{ Tenant: "cgrates.org", Account: "dan2", Direction: "*out", BalanceId: "*monetary",
attrs := &AttrAddActionTrigger{Tenant: "cgrates.org", Account: "dan2", Direction: "*out", BalanceId: "*monetary",
ThresholdValue: 2, DestinationId: "*any", Weight: 10, ActionsId: "WARN_VIA_HTTP"}
if err := rater.Call("ApierV1.AddTriggeredAction", attrs, &reply); err != nil {
t.Error("Got error on ApierV1.AddTriggeredAction: ", err.Error())
@@ -897,14 +900,13 @@ func TestApierAddTriggeredAction(t *testing.T) {
}
}
// Test here AddAccount
func TestApierAddAccount(t *testing.T) {
if !*testLocal {
return
}
//reply := ""
attrs := &AttrAddAccount{Tenant: "cgrates.org", Direction: "*out", Account: "dan4", Type: "prepaid", ActionTimingsId: "PREPAID_10"}
attrs := &AttrAddAccount{Tenant: "cgrates.org", Direction: "*out", Account: "dan4", Type: "prepaid", ActionTimingsId: "PREPAID_10"}
//if err := rater.Call("ApierV1.AddAccount", attrs, &reply); err != nil {
// t.Error("Got error on ApierV1.AddAccount: ", err.Error())
//} else if reply != "OK" {
@@ -920,7 +922,6 @@ func TestApierAddAccount(t *testing.T) {
}
}
// Test here GetBalance
func TestApierGetBalance(t *testing.T) {
if !*testLocal {

View File

@@ -63,7 +63,7 @@ func (self *ApierV1) ExportCsvCdrs(attr *AttrExpCsvCdrs, reply *ExportedCsvCdrs)
}
csvWriter := cdrexporter.NewCsvCdrWriter(fileOut, self.Config.RoundingDecimals, self.Config.CDRSExportExtraFields)
for _, cdr := range cdrs {
if err := csvWriter.Write(cdr.(*utils.RatedCDR)); err != nil {
if err := csvWriter.Write(cdr); err != nil {
os.Remove(fileName)
return err
}

Binary file not shown.

View File

@@ -19,44 +19,49 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package cdrc
import (
"fmt"
"bufio"
"encoding/csv"
"errors"
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/howeyc/fsnotify"
"os"
"path"
"io/ioutil"
"net/http"
"net/url"
"os"
"path"
"strconv"
"strings"
"bufio"
"encoding/csv"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cgrates/engine"
)
const (
CSV = "csv"
)
func NewCdrc(config *config.CGRConfig) (*Cdrc, error) {
cdrc := &Cdrc{cgrCfg: config}
// Before processing, make sure in and out folders exist
for _, dir := range []string{ cdrc.cgrCfg.CdrcCdrInDir, cdrc.cgrCfg.CdrcCdrOutDir } {
for _, dir := range []string{cdrc.cgrCfg.CdrcCdrInDir, cdrc.cgrCfg.CdrcCdrOutDir} {
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
return nil, fmt.Errorf("Folder %s does not exist", dir)
}
}
if err := cdrc.parseFieldIndexesFromConfig(); err != nil {
return nil, err
if cdrc.cgrCfg.CdrcCdrType == CSV {
if err := cdrc.parseFieldIndexesFromConfig(); err != nil {
return nil, err
}
}
cdrc.httpClient = new(http.Client)
return cdrc, nil
}
type Cdrc struct {
cgrCfg *config.CGRConfig
fieldIndxes map[string]int // Key is the name of the field, int is the position in the csv file
httpClient *http.Client
cgrCfg *config.CGRConfig
fieldIndxes map[string]int // Key is the name of the field, int is the position in the csv file
httpClient *http.Client
}
// When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing
@@ -66,13 +71,7 @@ func (self *Cdrc) Run() error {
}
// No automated, process and sleep approach
for {
engine.Logger.Info(fmt.Sprintf("Parsing folder %s for CDR files.", self.cgrCfg.CdrcCdrInDir))
filesInDir,_ := ioutil.ReadDir(self.cgrCfg.CdrcCdrInDir)
for _, file := range filesInDir {
if err := self.processFile(file.Name()); err != nil {
return err
}
}
self.processCdrDir()
time.Sleep(self.cgrCfg.CdrcRunDelay)
}
return nil
@@ -81,9 +80,7 @@ func (self *Cdrc) Run() error {
// Parses fieldIndex strings into fieldIndex integers needed
func (self *Cdrc) parseFieldIndexesFromConfig() error {
var err error
// Add main fields here
self.fieldIndxes = make(map[string]int)
// PrimaryCdrFields []string = []string{ACCID, CDRHOST, CDRSOURCE, REQTYPE, DIRECTION, TENANT, TOR, ACCOUNT, SUBJECT, DESTINATION, ANSWER_TIME, DURATION}
fieldKeys := []string{utils.ACCID, utils.REQTYPE, utils.DIRECTION, utils.TENANT, utils.TOR, utils.ACCOUNT, utils.SUBJECT, utils.DESTINATION, utils.ANSWER_TIME, utils.DURATION}
fieldIdxStrs := []string{self.cgrCfg.CdrcAccIdField, self.cgrCfg.CdrcReqTypeField, self.cgrCfg.CdrcDirectionField, self.cgrCfg.CdrcTenantField, self.cgrCfg.CdrcTorField,
self.cgrCfg.CdrcAccountField, self.cgrCfg.CdrcSubjectField, self.cgrCfg.CdrcDestinationField, self.cgrCfg.CdrcAnswerTimeField, self.cgrCfg.CdrcDurationField}
@@ -121,6 +118,18 @@ func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) {
return v, nil
}
// One run over the CDR folder
func (self *Cdrc) processCdrDir() error {
engine.Logger.Info(fmt.Sprintf("Parsing folder %s for CDR files.", self.cgrCfg.CdrcCdrInDir))
filesInDir, _ := ioutil.ReadDir(self.cgrCfg.CdrcCdrInDir)
for _, file := range filesInDir {
if err := self.processFile(path.Join(self.cgrCfg.CdrcCdrInDir, file.Name())); err != nil {
return err
}
}
return nil
}
// Watch the specified folder for file moves and parse the files on events
func (self *Cdrc) trackCDRFiles() (err error) {
watcher, err := fsnotify.NewWatcher()
@@ -164,9 +173,9 @@ func (self *Cdrc) processFile(filePath string) error {
if err != nil {
engine.Logger.Err(err.Error())
continue
}
}
if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cgrCfg.CdrcCdrs), cdrAsForm); err != nil {
engine.Logger.Err(fmt.Sprintf("Failed posting CDR, error: %s",err.Error()))
engine.Logger.Err(fmt.Sprintf("Failed posting CDR, error: %s", err.Error()))
continue
}
}

144
cdrc/cdrc_local_test.go Normal file
View File

@@ -0,0 +1,144 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2013 ITsysCOM
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package cdrc
import (
"errors"
"flag"
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"io/ioutil"
"os/exec"
"path"
"testing"
"time"
)
/*
README:
Enable local tests by passing '-local' to the go test command
It is expected that the data folder of CGRateS exists at path /usr/share/cgrates/data or passed via command arguments.
Prior running the tests, create database and users by running:
mysql -pyourrootpwd < /usr/share/cgrates/data/storage/mysql/create_db_with_users.sql
What these tests do:
* Flush tables in storDb.
* Start engine with default configuration and give it some time to listen (here caching can slow down).
*
*/
var cfg *config.CGRConfig
var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args
var dataDir = flag.String("data_dir", "/usr/share/cgrates/data", "CGR data dir path here")
var storDbType = flag.String("stordb_type", "mysql", "The type of the storDb database <mysql>")
var waitRater = flag.Int("wait_rater", 300, "Number of miliseconds to wait for rater to start and cache")
func init() {
cfgPath := path.Join(*dataDir, "conf", "cgrates.cfg")
cfg, _ = config.NewCGRConfig(&cfgPath)
}
var fileContent1 = `accid11,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
accid12,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
dummy_data
accid13,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
`
var fileContent2 = `accid21,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
accid22,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
#accid1,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
accid23,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1`
func startEngine() error {
enginePath, err := exec.LookPath("cgr-engine")
if err != nil {
return errors.New("Cannot find cgr-engine executable")
}
stopEngine()
engine := exec.Command(enginePath, "-cdrs", "-config", path.Join(*dataDir, "conf", "cgrates.cfg"))
if err := engine.Start(); err != nil {
return fmt.Errorf("Cannot start cgr-engine: %s", err.Error())
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time to rater to fire up
return nil
}
func stopEngine() error {
exec.Command("pkill", "cgr-engine").Run() // Just to make sure another one is not running, bit brutal maybe we can fine tune it
return nil
}
func TestEmptyTables(t *testing.T) {
if !*testLocal {
return
}
if *storDbType != utils.MYSQL {
t.Fatal("Unsupported storDbType")
}
var mysql *engine.MySQLStorage
if d, err := engine.NewMySQLStorage(cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass); err != nil {
t.Fatal("Error on opening database connection: ", err)
} else {
mysql = d.(*engine.MySQLStorage)
}
for _, scriptName := range []string{engine.CREATE_CDRS_TABLES_SQL, engine.CREATE_COSTDETAILS_TABLES_SQL, engine.CREATE_MEDIATOR_TABLES_SQL, engine.CREATE_TARIFFPLAN_TABLES_SQL} {
if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", *storDbType, scriptName)); err != nil {
t.Fatal("Error on mysql creation: ", err.Error())
return // No point in going further
}
}
for _, tbl := range []string{utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA} {
if _, err := mysql.Db.Query(fmt.Sprintf("SELECT 1 from %s", tbl)); err != nil {
t.Fatal(err.Error())
}
}
}
// Creates cdr files and starts the engine
func TestCreateCdrFiles(t *testing.T) {
if !*testLocal {
return
}
if err := ioutil.WriteFile(path.Join(cfg.CdrcCdrInDir, "file1.csv"), []byte(fileContent1), 0644); err != nil {
t.Fatal(err.Error)
}
if err := ioutil.WriteFile(path.Join(cfg.CdrcCdrInDir, "file2.csv"), []byte(fileContent2), 0644); err != nil {
t.Fatal(err.Error)
}
}
func TestProcessCdrDir(t *testing.T) {
if !*testLocal {
return
}
if err := startEngine(); err != nil {
t.Fatal(err.Error())
}
cdrc, err := NewCdrc(cfg)
if err != nil {
t.Fatal(err.Error())
}
if err := cdrc.processCdrDir(); err != nil {
t.Error(err)
}
stopEngine()
}

View File

@@ -1,34 +1,60 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2013 ITsysCOM
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package cdrc
import (
"testing"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"testing"
)
var cgrConfig *config.CGRConfig
var cdrc *Cdrc
func init() {
cgrConfig, _ = config.NewDefaultCGRConfig()
cdrc = &Cdrc{cgrCfg:cgrConfig}
}
func TestParseFieldIndexesFromConfig(t *testing.T) {
if err := cdrc.parseFieldIndexesFromConfig(); err != nil {
t.Error("Failed parsing default fieldIndexesFromConfig", err)
// Test default config
cgrConfig, _ := config.NewDefaultCGRConfig()
// Test primary field index definition
cgrConfig.CdrcAccIdField = "detect_me"
cdrc := &Cdrc{cgrCfg: cgrConfig}
if err := cdrc.parseFieldIndexesFromConfig(); err == nil {
t.Error("Failed detecting error in accounting id definition", err)
}
// Test extra field index definition
cgrConfig.CdrcAccIdField = "0" // Put back as int
cgrConfig.CdrcExtraFields = []string{"supplier1", "11:orig_ip"}
cdrc = &Cdrc{cgrCfg: cgrConfig}
if err := cdrc.parseFieldIndexesFromConfig(); err == nil {
t.Error("Failed detecting error in extra fields definition", err)
}
}
func TestCdrAsHttpForm(t *testing.T) {
cgrConfig, _ := config.NewDefaultCGRConfig()
cdrc := &Cdrc{cgrCfg: cgrConfig}
if err := cdrc.parseFieldIndexesFromConfig(); err != nil {
t.Error("Failed parsing default fieldIndexesFromConfig", err)
}
cdrRow := []string{"firstField", "secondField"}
_, err := cdrc.cdrAsHttpForm(cdrRow)
if err == nil {
t.Error("Failed to corectly detect missing fields from record")
}
cdrRow = []string{"acc1", "prepaid", "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1"}
cdrAsForm, err := cdrc.cdrAsHttpForm(cdrRow);
cdrAsForm, err := cdrc.cdrAsHttpForm(cdrRow)
if err != nil {
t.Error("Failed to parse CDR in form", err)
}

View File

@@ -32,6 +32,7 @@ func NewCgrCdrFromHttpReq(req *http.Request) (CgrCdr, error) {
}
}
cgrCdr := make(CgrCdr)
cgrCdr[utils.CDRHOST] = req.RemoteAddr
for k, vals := range req.Form {
cgrCdr[k] = vals[0] // We only support the first value for now, if more are provided it is considered remote's fault
}
@@ -94,7 +95,8 @@ func (cgrCdr CgrCdr) GetExtraFields() map[string]string {
return extraFields
}
func (cgrCdr CgrCdr) GetAnswerTime() (t time.Time, err error) {
return utils.ParseDate(cgrCdr[utils.ANSWER_TIME])
//return utils.ParseDate(cgrCdr[utils.ANSWER_TIME])
return time.Parse("2006-01-02 15:04:05", cgrCdr[utils.ANSWER_TIME])
}
// Extracts duration as considered by the telecom switch

View File

@@ -60,6 +60,8 @@ var (
version = flag.Bool("version", false, "Prints the application version.")
raterEnabled = flag.Bool("rater", false, "Enforce starting of the rater daemon overwriting config")
schedEnabled = flag.Bool("scheduler", false, "Enforce starting of the scheduler daemon overwriting config")
cdrsEnabled = flag.Bool("cdrs", false, "Enforce starting of the cdrs daemon overwriting config")
cdrcEnabled = flag.Bool("cdrc", false, "Enforce starting of the cdrc service overwriting config")
bal = balancer2go.NewBalancer()
exitChan = make(chan bool)
sm sessionmanager.SessionManager
@@ -345,6 +347,12 @@ func main() {
if *schedEnabled {
cfg.SchedulerEnabled = *schedEnabled
}
if *cdrsEnabled {
cfg.CDRSEnabled = *cdrsEnabled
}
if *cdrcEnabled {
cfg.CdrcEnabled = *cdrcEnabled
}
if cfg.RaterEnabled {
if err := ratingDb.CacheRating(nil, nil, nil); err != nil {
engine.Logger.Crit(fmt.Sprintf("Cache rating error: %v", err))

View File

@@ -165,15 +165,15 @@ func (self *CGRConfig) setDefaults() error {
self.CDRSListen = "127.0.0.1:2022"
self.CDRSExtraFields = []string{}
self.CDRSMediator = ""
self.CDRSExportPath = "/var/log/cgrates/cdr/out"
self.CDRSExportPath = "/var/log/cgrates/cdr/cdrexport/csv"
self.CDRSExportExtraFields = []string{}
self.CdrcEnabled = false
self.CdrcCdrs = "127.0.0.1:2022"
self.CdrcCdrsMethod = "http_cgr"
self.CdrcRunDelay = time.Duration(0)
self.CdrcCdrType = "csv"
self.CdrcCdrInDir = "/var/log/cgrates/cdr/in/csv"
self.CdrcCdrOutDir = "/var/log/cgrates/cdr/out/csv"
self.CdrcCdrInDir = "/var/log/cgrates/cdr/cdrc/in"
self.CdrcCdrOutDir = "/var/log/cgrates/cdr/cdrc/out"
self.CdrcSourceId = "freeswitch_csv"
self.CdrcAccIdField = "0"
self.CdrcReqTypeField = "1"

View File

@@ -76,8 +76,8 @@ func TestDefaults(t *testing.T) {
eCfg.CdrcCdrsMethod = "http_cgr"
eCfg.CdrcRunDelay = time.Duration(0)
eCfg.CdrcCdrType = "csv"
eCfg.CdrcCdrInDir = "/var/log/cgrates/cdr/in/csv"
eCfg.CdrcCdrOutDir = "/var/log/cgrates/cdr/out/csv"
eCfg.CdrcCdrInDir = "/var/log/cgrates/cdr/cdrc/in"
eCfg.CdrcCdrOutDir = "/var/log/cgrates/cdr/cdrc/out"
eCfg.CdrcSourceId = "freeswitch_csv"
eCfg.CdrcAccIdField = "0"
eCfg.CdrcReqTypeField = "1"
@@ -91,7 +91,7 @@ func TestDefaults(t *testing.T) {
eCfg.CdrcDurationField = "9"
eCfg.CdrcExtraFields = []string{"10:supplier","11:orig_ip"}
eCfg.CDRSMediator = ""
eCfg.CDRSExportPath = "/var/log/cgrates/cdr/out"
eCfg.CDRSExportPath = "/var/log/cgrates/cdr/cdrexport/csv"
eCfg.CDRSExportExtraFields = []string{}
eCfg.MediatorEnabled = false
eCfg.MediatorListen = "127.0.0.1:2032"

View File

@@ -20,9 +20,9 @@ package console
import (
"fmt"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"time"
)
func init() {
@@ -44,7 +44,7 @@ func (self *CmdGetCost) Usage(name string) string {
// set param defaults
func (self *CmdGetCost) defaults() error {
self.rpcMethod = "Responder.GetCost"
self.rpcParams = &engine.CallDescriptor{ Direction: "*out" }
self.rpcParams = &engine.CallDescriptor{Direction: "*out"}
return nil
}
@@ -60,7 +60,7 @@ func (self *CmdGetCost) FromArgs(args []string) error {
if args[6] == "*now" {
tStart = time.Now()
} else {
tStart,err = utils.ParseDate(args[6])
tStart, err = utils.ParseDate(args[6])
if err != nil {
fmt.Println("\n*start_time* should have one of the formats:")
fmt.Println("\ttime.RFC3339\teg:2013-08-07T17:30:00Z in UTC")

View File

@@ -18,7 +18,9 @@ mkdir -p $PKG_DIR/usr/share/cgrates
cp -r ../../data/ $PKG_DIR/usr/share/cgrates/
mkdir -p $PKG_DIR/usr/bin
cp /usr/local/goapps/bin/cgr-* $PKG_DIR/usr/bin/
mkdir -p $PKG_DIR/var/log/cgrates/cdr/out
mkdir -p $PKG_DIR/var/log/cgrates/cdr/cdrc/in
mkdir -p $PKG_DIR/var/log/cgrates/cdr/cdrc/out
mkdir -p $PKG_DIR/var/log/cgrates/cdr/cdrexport/csv
mkdir -p $PKG_DIR/var/log/cgrates/history
dpkg-deb --build $PKG_DIR

View File

@@ -20,10 +20,10 @@ package engine
import (
"flag"
"path"
"testing"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"path"
"testing"
)
/*
@@ -40,7 +40,7 @@ README:
*/
// Globals used
var ratingDbCsv, ratingDbStor, ratingDbApier RatingStorage // Each ratingDb will have it's own sources to collect data
var ratingDbCsv, ratingDbStor, ratingDbApier RatingStorage // Each ratingDb will have it's own sources to collect data
var accountDbCsv, accountDbStor, accountDbApier AccountingStorage // Each ratingDb will have it's own sources to collect data
var storDb LoadStorage
var cfg *config.CGRConfig
@@ -67,15 +67,15 @@ func TestConnDataDbs(t *testing.T) {
if ratingDbApier, err = ConfigureRatingStorage(cfg.RatingDBType, cfg.RatingDBHost, cfg.RatingDBPort, "6", cfg.RatingDBUser, cfg.RatingDBPass, cfg.DBDataEncoding); err != nil {
t.Fatal("Error on ratingDb connection: ", err.Error())
}
if accountDbCsv, err = ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort, "7",
if accountDbCsv, err = ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort, "7",
cfg.AccountDBUser, cfg.AccountDBPass, cfg.DBDataEncoding); err != nil {
t.Fatal("Error on ratingDb connection: ", err.Error())
}
if accountDbStor, err = ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort, "8",
if accountDbStor, err = ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort, "8",
cfg.AccountDBUser, cfg.AccountDBPass, cfg.DBDataEncoding); err != nil {
t.Fatal("Error on ratingDb connection: ", err.Error())
}
if accountDbApier, err = ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort, "9",
if accountDbApier, err = ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort, "9",
cfg.AccountDBUser, cfg.AccountDBPass, cfg.DBDataEncoding); err != nil {
t.Fatal("Error on ratingDb connection: ", err.Error())
}

View File

@@ -93,8 +93,8 @@ type AccountingStorage interface {
type CdrStorage interface {
Storage
SetCdr(utils.CDR) error
SetRatedCdr(utils.CDR, *CallCost, string) error
GetRatedCdrs(time.Time, time.Time) ([]utils.CDR, error)
SetRatedCdr(utils.CDR, string, *CallCost, string) error
GetRatedCdrs(time.Time, time.Time) ([]*utils.RatedCDR, error)
}
type LogStorage interface {

View File

@@ -22,11 +22,11 @@ import (
"errors"
"fmt"
"strings"
"time"
"github.com/cgrates/cgrates/cache2go"
"github.com/cgrates/cgrates/history"
"github.com/cgrates/cgrates/utils"
"strings"
"time"
)
type MapStorage struct {

View File

@@ -688,15 +688,18 @@ func (self *SQLStorage) LogActionTiming(source string, at *ActionTiming, as Acti
func (self *SQLStorage) LogError(uuid, source, errstr string) (err error) { return }
func (self *SQLStorage) SetCdr(cdr utils.CDR) (err error) {
// map[account:1001 direction:out orig_ip:172.16.1.1 tor:call accid:accid23 answer_time:2013-02-03 19:54:00 cdrsource:freeswitch_csv destination:+4986517174963 duration:62 reqtype:prepaid subject:1001 supplier:supplier1 tenant:cgrates.org]
startTime, err := cdr.GetAnswerTime()
if err != nil {
Logger.Info(err.Error())
return err
}
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (NULL, '%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', %d)",
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (NULL,'%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', %d)",
utils.TBL_CDRS_PRIMARY,
cdr.GetCgrId(),
cdr.GetAccId(),
cdr.GetCdrHost(),
cdr.GetCdrSource(),
cdr.GetReqType(),
cdr.GetDirection(),
cdr.GetTenant(),
@@ -726,11 +729,12 @@ func (self *SQLStorage) SetCdr(cdr utils.CDR) (err error) {
return
}
func (self *SQLStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost, extraInfo string) (err error) {
func (self *SQLStorage) SetRatedCdr(cdr utils.CDR, runId string, cc *CallCost, extraInfo string) (err error) {
// ToDo: Add here source and subject
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, subject, cost, extra_info) VALUES ('%s', '%s', %f, '%s')",
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid,runid,subject,cost,extra_info) VALUES ('%s','%s','%s',%f,'%s')",
utils.TBL_RATED_CDRS,
cdr.GetCgrId(),
runId,
cdr.GetSubject(),
cc.Cost+cc.ConnectFee,
extraInfo))
@@ -742,9 +746,9 @@ func (self *SQLStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost, extraInfo strin
}
// Return a slice of rated CDRs from storDb using optional timeStart and timeEnd as filters.
func (self *SQLStorage) GetRatedCdrs(timeStart, timeEnd time.Time) ([]utils.CDR, error) {
var cdrs []utils.CDR
q := fmt.Sprintf("SELECT %s.cgrid,accid,cdrhost,reqtype,direction,tenant,tor,account,%s.subject,destination,answer_time,duration,extra_fields,cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid", utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_RATED_CDRS, utils.TBL_CDRS_PRIMARY, utils.TBL_RATED_CDRS)
func (self *SQLStorage) GetRatedCdrs(timeStart, timeEnd time.Time) ([]*utils.RatedCDR, error) {
var cdrs []*utils.RatedCDR
q := fmt.Sprintf("SELECT %s.cgrid,accid,cdrhost,cdrsource,reqtype,direction,tenant,tor,account,%s.subject,destination,answer_time,duration,extra_fields,runid,cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid", utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_RATED_CDRS, utils.TBL_CDRS_PRIMARY, utils.TBL_RATED_CDRS)
if !timeStart.IsZero() && !timeEnd.IsZero() {
q += fmt.Sprintf(" WHERE answer_time>='%s' AND answer_time<'%s'", timeStart, timeEnd)
} else if !timeStart.IsZero() {
@@ -758,12 +762,13 @@ func (self *SQLStorage) GetRatedCdrs(timeStart, timeEnd time.Time) ([]utils.CDR,
}
defer rows.Close()
for rows.Next() {
var cgrid, accid, cdrhost, reqtype, direction, tenant, tor, account, subject, destination string
var cgrid, accid, cdrhost, cdrsrc, reqtype, direction, tenant, tor, account, subject, destination, runid string
var extraFields []byte
var answerTimestamp, duration int64
var cost float64
var extraFieldsMp map[string]string
if err := rows.Scan(&cgrid, &accid, &cdrhost, &reqtype, &direction, &tenant, &tor, &account, &subject, &destination, &answerTimestamp, &duration, &extraFields, &cost); err != nil {
if err := rows.Scan(&cgrid, &accid, &cdrhost, &cdrsrc, &reqtype, &direction, &tenant, &tor, &account, &subject, &destination, &answerTimestamp, &duration,
&extraFields, &runid, &cost); err != nil {
return nil, err
}
answerTime := time.Unix(answerTimestamp, 0)
@@ -771,11 +776,11 @@ func (self *SQLStorage) GetRatedCdrs(timeStart, timeEnd time.Time) ([]utils.CDR,
return nil, err
}
storCdr := &utils.RatedCDR{
CgrId: cgrid, AccId: accid, CdrHost: cdrhost, ReqType: reqtype, Direction: direction, Tenant: tenant,
CgrId: cgrid, AccId: accid, CdrHost: cdrhost, CdrSource: cdrsrc, ReqType: reqtype, Direction: direction, Tenant: tenant,
TOR: tor, Account: account, Subject: subject, Destination: destination, AnswerTime: answerTime, Duration: duration,
ExtraFields: extraFieldsMp, Cost: cost,
ExtraFields: extraFieldsMp, MediationRunId: runid, Cost: cost,
}
cdrs = append(cdrs, utils.CDR(storCdr))
cdrs = append(cdrs, storCdr)
}
return cdrs, nil
}

View File

@@ -42,10 +42,10 @@ func ConfigureRatingStorage(db_type, host, port, name, user, pass, marshaler str
d, err = NewRedisStorage(host, db_nb, pass, marshaler)
db = d.(RatingStorage)
/*
// Add here as soon as interface implemented
case utils.MONGO:
d, err = NewMongoStorage(host, port, name, user, pass)
db = d.(RatingStorage)
// Add here as soon as interface implemented
case utils.MONGO:
d, err = NewMongoStorage(host, port, name, user, pass)
db = d.(RatingStorage)
*/
default:
err = errors.New("unknown db")
@@ -72,9 +72,9 @@ func ConfigureAccountingStorage(db_type, host, port, name, user, pass, marshaler
d, err = NewRedisStorage(host, db_nb, pass, marshaler)
db = d.(AccountingStorage)
/*
case utils.MONGO:
d, err = NewMongoStorage(host, port, name, user, pass)
db = d.(AccountingStorage)
case utils.MONGO:
d, err = NewMongoStorage(host, port, name, user, pass)
db = d.(AccountingStorage)
*/
default:
err = errors.New("unknown db")
@@ -89,24 +89,24 @@ func ConfigureLogStorage(db_type, host, port, name, user, pass, marshaler string
var d Storage
switch db_type {
/*
case utils.REDIS:
var db_nb int
db_nb, err = strconv.Atoi(name)
if err != nil {
Logger.Crit("Redis db name must be an integer!")
return nil, err
}
if port != "" {
host += ":" + port
}
d, err = NewRedisStorage(host, db_nb, pass, marshaler)
db = d.(LogStorage)
case utils.MONGO:
d, err = NewMongoStorage(host, port, name, user, pass)
db = d.(LogStorage)
case utils.POSTGRES:
d, err = NewPostgresStorage(host, port, name, user, pass)
db = d.(LogStorage)
case utils.REDIS:
var db_nb int
db_nb, err = strconv.Atoi(name)
if err != nil {
Logger.Crit("Redis db name must be an integer!")
return nil, err
}
if port != "" {
host += ":" + port
}
d, err = NewRedisStorage(host, db_nb, pass, marshaler)
db = d.(LogStorage)
case utils.MONGO:
d, err = NewMongoStorage(host, port, name, user, pass)
db = d.(LogStorage)
case utils.POSTGRES:
d, err = NewPostgresStorage(host, port, name, user, pass)
db = d.(LogStorage)
*/
case utils.MYSQL:
d, err = NewMySQLStorage(host, port, name, user, pass)
@@ -124,9 +124,9 @@ func ConfigureLoadStorage(db_type, host, port, name, user, pass, marshaler strin
var d Storage
switch db_type {
/*
case utils.POSTGRES:
d, err = NewPostgresStorage(host, port, name, user, pass)
db = d.(LoadStorage)
case utils.POSTGRES:
d, err = NewPostgresStorage(host, port, name, user, pass)
db = d.(LoadStorage)
*/
case utils.MYSQL:
d, err = NewMySQLStorage(host, port, name, user, pass)
@@ -144,9 +144,9 @@ func ConfigureCdrStorage(db_type, host, port, name, user, pass string) (db CdrSt
var d Storage
switch db_type {
/*
case utils.POSTGRES:
d, err = NewPostgresStorage(host, port, name, user, pass)
db = d.(CdrStorage)
case utils.POSTGRES:
d, err = NewPostgresStorage(host, port, name, user, pass)
db = d.(CdrStorage)
*/
case utils.MYSQL:
d, err = NewMySQLStorage(host, port, name, user, pass)

View File

@@ -43,36 +43,21 @@ func NewMediator(connector engine.Connector, logDb engine.LogStorage, cdrDb engi
}
type Mediator struct {
connector engine.Connector
logDb engine.LogStorage
cdrDb engine.CdrStorage
cgrCfg *config.CGRConfig
connector engine.Connector
logDb engine.LogStorage
cdrDb engine.CdrStorage
cgrCfg *config.CGRConfig
}
/*
Responsible for parsing configuration fields out of CGRConfig instance, doing the necessary pre-checks.
@param cfgVals: keep ordered references to configuration fields from CGRConfig instance.
fieldKeys and cfgVals are directly related through index.
Method logic:
* Make sure the field used as reference in mediation process loop is not empty.
* All other fields should match the length of reference field.
* Accounting id field should not be empty.
* If we run mediation on csv file:
* Make sure cdrInDir and cdrOutDir are valid paths.
* Populate fieldIdxs by converting fieldNames into integers
*/
func (self *Mediator) parseConfig() error {
cfgVals := [][]string{self.cgrCfg.MediatorSubjectFields, self.cgrCfg.MediatorReqTypeFields, self.cgrCfg.MediatorDirectionFields,
self.cgrCfg.MediatorTenantFields, self.cgrCfg.MediatorTORFields, self.cgrCfg.MediatorAccountFields, self.cgrCfg.MediatorDestFields,
self.cgrCfg.MediatorAnswerTimeFields, self.cgrCfg.MediatorDurationFields}
if len(self.cgrCfg.MediatorRunIds) == 0 {
return errors.New("Unconfigured mediator run_ids")
}
// All other configured fields must match the length of reference fields
for iCfgVal := range cfgVals {
if len(self.cgrCfg.MediatorRunIds) != len(cfgVals[iCfgVal]) {
// Make sure we have everywhere the length of runIds
// Make sure we have everywhere the length of runIds
return errors.New("Inconsistent lenght of mediator fields.")
}
}
@@ -132,8 +117,6 @@ func (self *Mediator) getCostsFromRater(cdr utils.CDR) (*engine.CallCost, error)
return cc, err
}
func (self *Mediator) MediateDBCDR(cdr utils.CDR) error {
var qryCC *engine.CallCost
cc := &engine.CallCost{Cost: -1}
@@ -154,5 +137,5 @@ func (self *Mediator) MediateDBCDR(cdr utils.CDR) error {
if errCost != nil {
extraInfo = errCost.Error()
}
return self.cdrDb.SetRatedCdr(cdr, cc, extraInfo)
return self.cdrDb.SetRatedCdr(cdr, cdr.GetReqType(), cc, extraInfo)
}

View File

@@ -154,5 +154,5 @@ func ParseDurationWithSecs(durStr string) (time.Duration, error) {
if _, err := strconv.Atoi(durStr); err == nil { // No suffix, default to seconds
durStr += "s"
}
return time.ParseDuration(durStr)
return time.ParseDuration(durStr)
}

View File

@@ -22,77 +22,22 @@ import (
"time"
)
// CDR as extracted from StorDb. Kinda standard of internal CDR
// Rated CDR as extracted from StorDb. Kinda standard of internal CDR
type RatedCDR struct {
CgrId string
AccId string
CdrHost string
CdrSource string
ReqType string
Direction string
Tenant string
TOR string
Account string
Subject string
Destination string
AnswerTime time.Time
Duration int64
ExtraFields map[string]string
Cost float64
}
func (ratedCdr *RatedCDR) GetCgrId() string {
return ratedCdr.CgrId
}
func (ratedCdr *RatedCDR) GetAccId() string {
return ratedCdr.AccId
}
func (ratedCdr *RatedCDR) GetCdrHost() string {
return ratedCdr.CdrHost
}
func (ratedCdr *RatedCDR) GetCdrSource() string {
return ratedCdr.CdrSource
}
func (ratedCdr *RatedCDR) GetDirection() string {
return ratedCdr.Direction
}
func (ratedCdr *RatedCDR) GetSubject() string {
return ratedCdr.Subject
}
func (ratedCdr *RatedCDR) GetAccount() string {
return ratedCdr.Account
}
func (ratedCdr *RatedCDR) GetDestination() string {
return ratedCdr.Destination
}
func (ratedCdr *RatedCDR) GetTOR() string {
return ratedCdr.TOR
}
func (ratedCdr *RatedCDR) GetTenant() string {
return ratedCdr.Tenant
}
func (ratedCdr *RatedCDR) GetReqType() string {
return ratedCdr.ReqType
}
func (ratedCdr *RatedCDR) GetAnswerTime() (time.Time, error) {
return ratedCdr.AnswerTime, nil
}
func (ratedCdr *RatedCDR) GetDuration() int64 {
return ratedCdr.Duration
}
func (ratedCdr *RatedCDR) GetExtraFields() map[string]string {
return ratedCdr.ExtraFields
CgrId string
AccId string
CdrHost string
CdrSource string
ReqType string
Direction string
Tenant string
TOR string
Account string
Subject string
Destination string
AnswerTime time.Time
Duration int64
ExtraFields map[string]string
MediationRunId string
Cost float64
}

View File

@@ -250,7 +250,7 @@ func TestSplitPrefixEmpty(t *testing.T) {
func TestParseDurationWithSecs(t *testing.T) {
durStr := "2"
durExpected := time.Duration(2)*time.Second
durExpected := time.Duration(2) * time.Second
if parsed, err := ParseDurationWithSecs(durStr); err != nil {
t.Error(err)
} else if parsed != durExpected {
@@ -263,12 +263,10 @@ func TestParseDurationWithSecs(t *testing.T) {
t.Error("Parsed different than expected")
}
durStr = "2ms"
durExpected = time.Duration(2)*time.Millisecond
durExpected = time.Duration(2) * time.Millisecond
if parsed, err := ParseDurationWithSecs(durStr); err != nil {
t.Error(err)
} else if parsed != durExpected {
t.Error("Parsed different than expected")
}
}