Refactored CDRs internal functions, better CDRC .csv testing

This commit is contained in:
DanB
2016-05-01 17:06:25 +02:00
parent b303ff980e
commit 4fb9a641ee
9 changed files with 364 additions and 330 deletions

View File

@@ -104,6 +104,13 @@ func TestCDRStatsLclGetQueueIds2(t *testing.T) {
} else if len(eQueueIds) != len(queueIds) {
t.Errorf("Expecting: %v, received: %v", eQueueIds, queueIds)
}
var rcvMetrics map[string]float64
expectedMetrics := map[string]float64{"ASR": -1, "ACD": -1}
if err := cdrstRpc.Call("CDRStatsV1.GetMetrics", AttrGetMetrics{StatsQueueId: "CDRST4"}, &rcvMetrics); err != nil {
t.Error("Calling CDRStatsV1.GetMetrics, got error: ", err.Error())
} else if !reflect.DeepEqual(expectedMetrics, rcvMetrics) {
t.Errorf("Expecting: %v, received: %v", expectedMetrics, rcvMetrics)
}
}
func TestCDRStatsLclPostCdrs(t *testing.T) {
@@ -112,28 +119,28 @@ func TestCDRStatsLclPostCdrs(t *testing.T) {
}
httpClient := new(http.Client)
storedCdrs := []*engine.CDR{
&engine.CDR{CGRID: utils.Sha1("dsafdsafa", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsaf",
&engine.CDR{CGRID: utils.Sha1("dsafdsafa", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsafa",
OriginHost: "192.168.1.1", Source: "test",
RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(),
AnswerTime: time.Now(), RunID: utils.DEFAULT_RUNID, Usage: time.Duration(10) * time.Second,
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
},
&engine.CDR{CGRID: utils.Sha1("dsafdsafb", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsaf",
&engine.CDR{CGRID: utils.Sha1("dsafdsafb", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsafb",
OriginHost: "192.168.1.1", Source: "test",
RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(),
AnswerTime: time.Now(), RunID: utils.DEFAULT_RUNID,
Usage: time.Duration(5) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
},
&engine.CDR{CGRID: utils.Sha1("dsafdsafc", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsaf",
&engine.CDR{CGRID: utils.Sha1("dsafdsafc", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsafc",
OriginHost: "192.168.1.1", Source: "test",
RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(), AnswerTime: time.Now(),
RunID: utils.DEFAULT_RUNID,
Usage: time.Duration(30) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
},
&engine.CDR{CGRID: utils.Sha1("dsafdsafd", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsaf",
&engine.CDR{CGRID: utils.Sha1("dsafdsafd", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsafd",
OriginHost: "192.168.1.1", Source: "test",
RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(), AnswerTime: time.Time{},
@@ -146,7 +153,7 @@ func TestCDRStatsLclPostCdrs(t *testing.T) {
t.Error(err.Error())
}
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
time.Sleep(time.Duration(1000) * time.Millisecond)
}
func TestCDRStatsLclGetMetrics1(t *testing.T) {
@@ -204,3 +211,12 @@ func TestCDRStatsLclResetMetrics(t *testing.T) {
t.Errorf("Expecting: %v, received: %v", expectedMetrics2, rcvMetrics2)
}
}
func TestCDRStatsLclKillEngine(t *testing.T) {
if !*testLocal {
return
}
if err := engine.KillEngine(*waitRater); err != nil {
t.Error(err)
}
}

View File

@@ -1,223 +0,0 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2012-2015 ITsysCOM
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package cdrc
import (
"errors"
"flag"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
/*
README:
Enable local tests by passing '-local' to the go test command
It is expected that the data folder of CGRateS exists at path /usr/share/cgrates/data or passed via command arguments.
Prior running the tests, create database and users by running:
mysql -pyourrootpwd < /usr/share/cgrates/data/storage/mysql/create_db_with_users.sql
What these tests do:
* Flush tables in storDb.
* Start engine with default configuration and give it some time to listen (here caching can slow down).
*
*/
var cfgPath string
var cfg *config.CGRConfig
var cdrcCfgs []*config.CdrcConfig
var cdrcCfg *config.CdrcConfig
var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args
var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
var storDbType = flag.String("stordb_type", "mysql", "The type of the storDb database <mysql>")
var waitRater = flag.Int("wait_rater", 300, "Number of miliseconds to wait for rater to start and cache")
var fileContent1 = `accid11,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
accid12,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
dummy_data
accid13,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
`
var fileContent2 = `accid21,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
accid22,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
#accid1,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
accid23,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1`
var fileContent3 = `accid31;prepaid;out;cgrates.org;call;1001;1001;+4986517174963;2013-02-03 19:54:00;62;supplier1;172.16.1.1
accid32;prepaid;out;cgrates.org;call;1001;1001;+4986517174963;2013-02-03 19:54:00;62;supplier1;172.16.1.1
#accid1;prepaid;out;cgrates.org;call;1001;1001;+4986517174963;2013-02-03 19:54:00;62;supplier1;172.16.1.1
accid33;prepaid;out;cgrates.org;call;1001;1001;+4986517174963;2013-02-03 19:54:00;62;supplier1;172.16.1.1`
func startEngine() error {
enginePath, err := exec.LookPath("cgr-engine")
if err != nil {
return errors.New("Cannot find cgr-engine executable")
}
stopEngine()
engine := exec.Command(enginePath, "-config", cfgPath)
if err := engine.Start(); err != nil {
return fmt.Errorf("Cannot start cgr-engine: %s", err.Error())
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time to rater to fire up
return nil
}
func stopEngine() error {
exec.Command("pkill", "cgr-engine").Run() // Just to make sure another one is not running, bit brutal maybe we can fine tune it
return nil
}
// Need it here and not in init since Travis has no possibility to load local file
func TestCsvLclLoadConfigt(*testing.T) {
if !*testLocal {
return
}
cfgPath = path.Join(*dataDir, "conf", "samples", "apier")
cfg, _ = config.NewCGRConfigFromFolder(cfgPath)
if len(cfg.CdrcProfiles) > 0 {
cdrcCfgs = cfg.CdrcProfiles["/var/log/cgrates/cdrc/in"]
}
}
func TestCsvLclEmptyTables(t *testing.T) {
if !*testLocal {
return
}
if *storDbType != utils.MYSQL {
t.Fatal("Unsupported storDbType")
}
mysql, err := engine.NewMySQLStorage(cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.StorDBMaxOpenConns, cfg.StorDBMaxIdleConns)
if err != nil {
t.Fatal("Error on opening database connection: ", err)
}
for _, scriptName := range []string{utils.CREATE_CDRS_TABLES_SQL, utils.CREATE_TARIFFPLAN_TABLES_SQL} {
if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", *storDbType, scriptName)); err != nil {
t.Fatal("Error on mysql creation: ", err.Error())
return // No point in going further
}
}
if _, err := mysql.Db.Query(fmt.Sprintf("SELECT 1 from %s", utils.TBL_CDRS)); err != nil {
t.Fatal(err.Error())
}
}
// Creates cdr files and starts the engine
func TestCsvLclCreateCdrFiles(t *testing.T) {
if !*testLocal {
return
}
if cdrcCfgs == nil {
t.Fatal("Empty default cdrc configuration")
}
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
break
}
if err := os.RemoveAll(cdrcCfg.CdrInDir); err != nil {
t.Fatal("Error removing folder: ", cdrcCfg.CdrInDir, err)
}
if err := os.MkdirAll(cdrcCfg.CdrInDir, 0755); err != nil {
t.Fatal("Error creating folder: ", cdrcCfg.CdrInDir, err)
}
if err := os.RemoveAll(cdrcCfg.CdrOutDir); err != nil {
t.Fatal("Error removing folder: ", cdrcCfg.CdrOutDir, err)
}
if err := os.MkdirAll(cdrcCfg.CdrOutDir, 0755); err != nil {
t.Fatal("Error creating folder: ", cdrcCfg.CdrOutDir, err)
}
if err := ioutil.WriteFile(path.Join(cdrcCfg.CdrInDir, "file1.csv"), []byte(fileContent1), 0644); err != nil {
t.Fatal(err.Error)
}
if err := ioutil.WriteFile(path.Join(cdrcCfg.CdrInDir, "file2.csv"), []byte(fileContent2), 0644); err != nil {
t.Fatal(err.Error)
}
}
func TestCsvLclProcessCdrDir(t *testing.T) {
if !*testLocal {
return
}
var cdrcCfg *config.CdrcConfig
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
break
}
for _, cdrsConn := range cdrcCfg.CdrsConns {
if cdrsConn.Address == utils.MetaInternal {
cdrsConn.Address = "127.0.0.1:2013"
}
}
if err := startEngine(); err != nil {
t.Fatal(err.Error())
}
cdrc, err := NewCdrc(cdrcCfgs, true, nil, make(chan struct{}), "")
if err != nil {
t.Fatal(err.Error())
}
if err := cdrc.processCdrDir(); err != nil {
t.Error(err)
}
stopEngine()
}
// Creates cdr files and starts the engine
func TestCsvLclCreateCdr3File(t *testing.T) {
if !*testLocal {
return
}
if err := os.RemoveAll(cdrcCfg.CdrInDir); err != nil {
t.Fatal("Error removing folder: ", cdrcCfg.CdrInDir, err)
}
if err := os.MkdirAll(cdrcCfg.CdrInDir, 0755); err != nil {
t.Fatal("Error creating folder: ", cdrcCfg.CdrInDir, err)
}
if err := ioutil.WriteFile(path.Join(cdrcCfg.CdrInDir, "file3.csv"), []byte(fileContent3), 0644); err != nil {
t.Fatal(err.Error)
}
}
func TestCsvLclProcessCdr3Dir(t *testing.T) {
if !*testLocal {
return
}
for _, cdrsConn := range cdrcCfg.CdrsConns {
if cdrsConn.Address == utils.MetaInternal {
cdrsConn.Address = "127.0.0.1:2013"
}
}
if err := startEngine(); err != nil {
t.Fatal(err.Error())
}
cdrc, err := NewCdrc(cdrcCfgs, true, nil, make(chan struct{}), "")
if err != nil {
t.Fatal(err.Error())
}
if err := cdrc.processCdrDir(); err != nil {
t.Error(err)
}
stopEngine()
}

185
cdrc/csv_it_test.go Normal file
View File

@@ -0,0 +1,185 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2012-2015 ITsysCOM
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package cdrc
import (
"flag"
"io/ioutil"
"net/rpc"
"net/rpc/jsonrpc"
"os"
"path"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
)
/*
README:
Enable local tests by passing '-local' to the go test command
It is expected that the data folder of CGRateS exists at path /usr/share/cgrates/data or passed via command arguments.
Prior running the tests, create database and users by running:
mysql -pyourrootpwd < /usr/share/cgrates/data/storage/mysql/create_db_with_users.sql
What these tests do:
* Flush tables in storDb.
* Start engine with default configuration and give it some time to listen (here caching can slow down).
*
*/
var csvCfgPath string
var csvCfg *config.CGRConfig
var cdrcCfgs []*config.CdrcConfig
var cdrcCfg *config.CdrcConfig
var cdrcRpc *rpc.Client
var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args
var testIT = flag.Bool("integration", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args
var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
var waitRater = flag.Int("wait_rater", 300, "Number of miliseconds to wait for rater to start and cache")
var fileContent1 = `dbafe9c8614c785a65aabd116dd3959c3c56f7f6,default,*voice,dsafdsaf,*rated,*out,cgrates.org,call,1001,1001,+4986517174963,2013-11-07 08:42:25 +0000 UTC,2013-11-07 08:42:26 +0000 UTC,10s,1.0100,val_extra3,"",val_extra1
dbafe9c8614c785a65aabd116dd3959c3c56f7f7,default,*voice,dsafdsag,*rated,*out,cgrates.org,call,1001,1001,+4986517174964,2013-11-07 09:42:25 +0000 UTC,2013-11-07 09:42:26 +0000 UTC,20s,1.0100,val_extra3,"",val_extra1
`
var fileContent2 = `accid21;*prepaid;itsyscom.com;1001;086517174963;2013-02-03 19:54:00;62;val_extra3;"";val_extra1
accid22;*postpaid;itsyscom.com;1001;+4986517174963;2013-02-03 19:54:00;123;val_extra3;"";val_extra1
#accid1;*pseudoprepaid;itsyscom.com;1001;+4986517174963;2013-02-03 19:54:00;12;val_extra3;"";val_extra1
accid23;*rated;cgrates.org;1001;086517174963;2013-02-03 19:54:00;26;val_extra3;"";val_extra1`
func TestCsvITInitConfig(t *testing.T) {
if !*testIT {
return
}
var err error
csvCfgPath = path.Join(*dataDir, "conf", "samples", "cdrccsv")
if csvCfg, err = config.NewCGRConfigFromFolder(csvCfgPath); err != nil {
t.Fatal("Got config error: ", err.Error())
}
}
// InitDb so we can rely on count
func TestCsvITInitCdrDb(t *testing.T) {
if !*testIT {
return
}
if err := engine.InitStorDb(csvCfg); err != nil {
t.Fatal(err)
}
}
/*
func TestCsvITCreateCdrDirs(t *testing.T) {
if !*testIT {
return
}
for _, cdrcProfiles := range csvCfg.CdrcProfiles {
for _, cdrcInst := range cdrcProfiles {
for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} {
if err := os.RemoveAll(dir); err != nil {
t.Fatal("Error removing folder: ", dir, err)
}
if err := os.MkdirAll(dir, 0755); err != nil {
t.Fatal("Error creating folder: ", dir, err)
}
}
}
}
}
func TestCsvITStartEngine(t *testing.T) {
if !*testIT {
return
}
if _, err := engine.StopStartEngine(csvCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
*/
// Connect rpc client to rater
func TestCsvITRpcConn(t *testing.T) {
if !*testIT {
return
}
var err error
cdrcRpc, err = jsonrpc.Dial("tcp", csvCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
}
// The default scenario, out of cdrc defined in .cfg file
func TestCsvITHandleCdr1File(t *testing.T) {
if !*testIT {
return
}
fileName := "file1.csv"
tmpFilePath := path.Join("/tmp", fileName)
if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent1), 0644); err != nil {
t.Fatal(err.Error)
}
if err := os.Rename(tmpFilePath, path.Join("/tmp/cdrctests/csvit1/in", fileName)); err != nil {
t.Fatal("Error moving file to processing directory: ", err)
}
}
// Scenario out of first .xml config
func TestCsvITHandleCdr2File(t *testing.T) {
if !*testIT {
return
}
fileName := "file2.csv"
tmpFilePath := path.Join("/tmp", fileName)
if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent2), 0644); err != nil {
t.Fatal(err.Error)
}
if err := os.Rename(tmpFilePath, path.Join("/tmp/cdrctests/csvit2/in", fileName)); err != nil {
t.Fatal("Error moving file to processing directory: ", err)
}
}
func TestCsvITProcessedFiles(t *testing.T) {
if !*testIT {
return
}
time.Sleep(time.Duration(2**waitRater) * time.Millisecond)
if outContent1, err := ioutil.ReadFile("/tmp/cdrctests/csvit1/out/file1.csv"); err != nil {
t.Error(err)
} else if fileContent1 != string(outContent1) {
t.Errorf("Expecting: %q, received: %q", fileContent1, string(outContent1))
}
if outContent2, err := ioutil.ReadFile("/tmp/cdrctests/csvit2/out/file2.csv"); err != nil {
t.Error(err)
} else if fileContent2 != string(outContent2) {
t.Errorf("Expecting: %q, received: %q", fileContent1, string(outContent2))
}
}
func TestCsvITKillEngine(t *testing.T) {
if !*testIT {
return
}
if err := engine.KillEngine(*waitRater); err != nil {
t.Error(err)
}
}

View File

@@ -853,7 +853,7 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir] = make([]*CdrcConfig, 0)
}
var cdrcInstCfg *CdrcConfig
if *jsnCrc1Cfg.Id == utils.META_DEFAULT {
if *jsnCrc1Cfg.Id == utils.META_DEFAULT && self.dfltCdrcProfile == nil {
cdrcInstCfg = new(CdrcConfig)
} else {
cdrcInstCfg = self.dfltCdrcProfile.Clone() // Clone default so we do not inherit pointers

View File

@@ -145,8 +145,9 @@
// },
// "cdrc": {
// "*default": {
// "cdrc": [
// {
// "id": "*default", // identifier of the CDRC runner
// "enabled": false, // enable CDR client functionality
// "dry_run": false, // do not send the CDRs to CDRS, just parse them
// "cdrs_conns": [
@@ -181,8 +182,9 @@
// {"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "13", "mandatory": true},
// ],
// "trailer_fields": [], // template of the import trailer fields
// }
// },
// },
// ],
// "sm_generic": {
// "enabled": false, // starts SessionManager service: <true|false>

View File

@@ -0,0 +1,67 @@
{
// Real-time Charging System for Telecom & ISP environments
// Copyright (C) ITsysCOM GmbH
//
// This file contains the default configuration hardcoded into CGRateS.
// This is what you get when you load CGRateS with an empty configuration file.
"cdrs": {
"enabled": true,
"rals_conns": [], // no rating support, just *raw CDR testing
},
"cdrc": [
{
"id": "*default",
"enabled": true,
"cdr_in_dir": "/tmp/cdrctests/csvit1/in",
"cdr_out_dir": "/tmp/cdrctests/csvit1/out",
"cdr_source_id": "csvit1",
// "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
// {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "2", "mandatory": true},
// {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "3", "mandatory": true},
// {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "4", "mandatory": true},
// {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "5", "mandatory": true},
// {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "6", "mandatory": true},
// {"tag": "Category", "field_id": "Category", "type": "*composed", "value": "7", "mandatory": true},
// {"tag": "Account", "field_id": "Account", "type": "*composed", "value": "8", "mandatory": true},
// {"tag": "Subject", "field_id": "Subject", "type": "*composed", "value": "9", "mandatory": true},
// {"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "10", "mandatory": true},
// {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "11", "mandatory": true},
// {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "12", "mandatory": true},
// {"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "13", "mandatory": true},
// ],
},
{
"id": "*CSVit2", // identifier of the CDRC runner
"enabled": true, // enable CDR client functionality
"field_separator": ";",
"cdr_in_dir": "/tmp/cdrctests/csvit2/in", // absolute path towards the directory where the CDRs are stored
"cdr_out_dir": "/tmp/cdrctests/csvit2/out", // absolute path towards the directory where processed CDRs will be moved
"cdr_source_id": "csvit2", // free form field, tag identifying the source of the CDRs within CDRS database
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*voice", "mandatory": true},
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "0", "mandatory": true},
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "1", "mandatory": true},
{"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true},
{"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "2", "mandatory": true},
{"tag": "Category", "field_id": "Category", "type": "*composed", "value": "^call", "mandatory": true},
{"tag": "Account", "field_id": "Account", "type": "*composed", "value": "3", "mandatory": true},
{"tag": "Subject", "field_id": "Subject", "type": "*composed", "value": "3", "mandatory": true},
{"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "~4:s/0([1-9]\\d+)/+49${1}/", "mandatory": true},
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "5", "mandatory": true},
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "5", "mandatory": true},
{"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "6", "mandatory": true},
{"tag": "HDRExtra3", "field_id": "HDRExtra3", "type": "*composed", "value": "6", "mandatory": true},
{"tag": "HDRExtra2", "field_id": "HDRExtra2", "type": "*composed", "value": "6", "mandatory": true},
{"tag": "HDRExtra1", "field_id": "HDRExtra1", "type": "*composed", "value": "6", "mandatory": true},
],
},
],
}

View File

@@ -1,6 +1,6 @@
#Id[0],QueueLength[1],TimeWindow[2],SaveInerval[3],Metric[4],SetupInterval[5],TOR[6],CdrHost[7],CdrSource[8],ReqType[9],Direction[10],Tenant[11],Category[12],Account[13],Subject[14],DestinationPrefix[15],PddInterval[16],UsageInterval[17],Supplier[18],DisconnectCause[19],MediationRunIds[20],RatedAccount[21],RatedSubject[22],CostInterval[23],Triggers[24]
CDRST3,5,60m,,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,default,rif,rif,0;2,CDRST3_WARN_ASR
CDRST3,5,60m,,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,*default,rif,rif,0;2,CDRST3_WARN_ASR
CDRST3,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACD
CDRST3,,,,ACC,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACC
CDRST4,10,0,,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,CDRST4_WARN_ASR
CDRST4,10,0,,ASR,,,,,,,cgrates.org,call,,,,,,,,*default,,,,CDRST4_WARN_ASR
CDRST4,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST4_WARN_ACD
1 #Id[0] QueueLength[1] TimeWindow[2] SaveInerval[3] Metric[4] SetupInterval[5] TOR[6] CdrHost[7] CdrSource[8] ReqType[9] Direction[10] Tenant[11] Category[12] Account[13] Subject[14] DestinationPrefix[15] PddInterval[16] UsageInterval[17] Supplier[18] DisconnectCause[19] MediationRunIds[20] RatedAccount[21] RatedSubject[22] CostInterval[23] Triggers[24]
2 CDRST3 5 60m ASR 2014-07-29T15:00:00Z;2014-07-29T16:00:00Z *voice 87.139.12.167 FS_JSON rated *out cgrates.org call dan dan +49 5m;10m default *default rif rif 0;2 CDRST3_WARN_ASR
3 CDRST3 ACD CDRST3_WARN_ACD
4 CDRST3 ACC CDRST3_WARN_ACC
5 CDRST4 10 0 ASR cgrates.org call *default CDRST4_WARN_ASR
6 CDRST4 ACD CDRST4_WARN_ACD

View File

@@ -86,13 +86,13 @@ func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater, pubsub, use
if stats == nil || reflect.ValueOf(stats).IsNil() {
stats = nil
}
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, client: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: Guardian}, nil
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rals: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: Guardian}, nil
}
type CdrServer struct {
cgrCfg *config.CGRConfig
cdrDb CdrStorage
client rpcclient.RpcClientConnection
rals rpcclient.RpcClientConnection
pubsub rpcclient.RpcClientConnection
users rpcclient.RpcClientConnection
aliases rpcclient.RpcClientConnection
@@ -183,23 +183,7 @@ func (self *CdrServer) RateCDRs(cdrFltr *utils.CDRsFilter, sendToStats bool) err
return err
}
for _, cdr := range cdrs {
// replace user profile fields
if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil {
return err
}
// replace aliases for cases they were loaded after CDR received
if err := LoadAlias(&AttrMatchingAlias{
Destination: cdr.Destination,
Direction: cdr.Direction,
Tenant: cdr.Tenant,
Category: cdr.Category,
Account: cdr.Account,
Subject: cdr.Subject,
Context: utils.ALIAS_CONTEXT_RATING,
}, cdr, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
return err
}
if err := self.rateStoreStatsReplicate(cdr, sendToStats); err != nil {
if err := self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, sendToStats, len(self.cgrCfg.CDRSCdrReplication) != 0); err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> Processing CDR %+v, got error: %s", cdr, err.Error()))
}
}
@@ -223,7 +207,7 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) {
if cdr.Subject == "" { // Use account information as rating subject if missing
cdr.Subject = cdr.Account
}
if !cdr.Rated {
if !cdr.Rated { // Enforce the RunID if CDR is not rated
cdr.RunID = utils.MetaRaw
}
if self.cgrCfg.CDRSStoreCdrs { // Store RawCDRs, this we do sync so we can reply with the status
@@ -231,24 +215,92 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) {
cdr.CostDetails.UpdateCost()
cdr.CostDetails.UpdateRatedUsage()
}
if err := self.cdrDb.SetCDR(cdr, false); err != nil { // Only original CDR stored in primary table, no derived
if err := self.cdrDb.SetCDR(cdr, false); err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> Storing primary CDR %+v, got error: %s", cdr, err.Error()))
return err // Error is propagated back and we don't continue processing the CDR if we cannot store it
}
}
go self.deriveRateStoreStatsReplicate(cdr)
// Attach raw CDR to stats
if self.stats != nil { // Send raw CDR to stats
var out int
go self.stats.Call("CDRStatsV1.AppendCDR", cdr, &out)
}
if len(self.cgrCfg.CDRSCdrReplication) != 0 { // Replicate raw CDR
go self.replicateCdr(cdr)
}
if self.rals != nil && !cdr.Rated { // CDRs not rated will be processed by Rating
go self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, self.stats != nil, len(self.cgrCfg.CDRSCdrReplication) != 0)
}
return nil
}
// Returns error if not able to properly store the CDR, mediation is async since we can always recover offline
func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR) error {
func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR, store, stats, replicate bool) error {
cdrRuns, err := self.deriveCdrs(cdr)
if err != nil {
return err
}
var ratedCDRs []*CDR // Gather all CDRs received from rating subsystem
for _, cdrRun := range cdrRuns {
if err := self.rateStoreStatsReplicate(cdrRun, true); err != nil {
return err
utils.Logger.Debug(fmt.Sprintf("Processing CDR run: %+v", cdrRun))
if err := LoadUserProfile(cdrRun, utils.EXTRA_FIELDS); err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> UserS handling for CDR %+v, got error: %s", cdrRun, err.Error()))
continue
}
if err := LoadAlias(&AttrMatchingAlias{
Destination: cdrRun.Destination,
Direction: cdrRun.Direction,
Tenant: cdrRun.Tenant,
Category: cdrRun.Category,
Account: cdrRun.Account,
Subject: cdrRun.Subject,
Context: utils.ALIAS_CONTEXT_RATING,
}, cdrRun, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
utils.Logger.Err(fmt.Sprintf("<CDRS> Aliasing CDR %+v, got error: %s", cdrRun, err.Error()))
continue
}
rcvRatedCDRs, err := self.rateCDR(cdrRun)
if err != nil {
cdrRun.Cost = -1.0 // If there was an error, mark the CDR
cdrRun.ExtraInfo = err.Error()
rcvRatedCDRs = []*CDR{cdrRun}
}
ratedCDRs = append(ratedCDRs, rcvRatedCDRs...)
}
// Request should be processed by SureTax
for _, ratedCDR := range ratedCDRs {
if ratedCDR.RunID == utils.META_SURETAX {
if err := SureTaxProcessCdr(ratedCDR); err != nil {
ratedCDR.Cost = -1.0
ratedCDR.ExtraInfo = err.Error() // Something failed, write the error in the ExtraInfo
}
}
}
// Store rated CDRs
if store {
for _, ratedCDR := range ratedCDRs {
if ratedCDR.CostDetails != nil {
ratedCDR.CostDetails.UpdateCost()
ratedCDR.CostDetails.UpdateRatedUsage()
}
if err := self.cdrDb.SetCDR(ratedCDR, true); err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> Storing rated CDR %+v, got error: %s", ratedCDR, err.Error()))
}
}
}
// Attach CDR to stats
if stats { // Send CDR to stats
for _, ratedCDR := range ratedCDRs {
var out int
if err := self.stats.Call("CDRStatsV1.AppendCDR", ratedCDR, &out); err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> Could not send CDR to stats: %s", err.Error()))
}
}
}
if replicate {
for _, ratedCDR := range ratedCDRs {
self.replicateCdr(ratedCDR)
}
}
return nil
@@ -259,6 +311,7 @@ func (self *CdrServer) deriveCdrs(cdr *CDR) ([]*CDR, error) {
if cdr.RunID != utils.MetaRaw { // Only derive *raw CDRs
return cdrRuns, nil
}
cdr.RunID = utils.META_DEFAULT // Rewrite *raw with *default since we have it as first run
if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil {
return nil, err
}
@@ -276,7 +329,7 @@ func (self *CdrServer) deriveCdrs(cdr *CDR) ([]*CDR, error) {
attrsDC := &utils.AttrDerivedChargers{Tenant: cdr.Tenant, Category: cdr.Category, Direction: cdr.Direction,
Account: cdr.Account, Subject: cdr.Subject, Destination: cdr.Destination}
var dcs utils.DerivedChargers
if err := self.client.Call("Responder.GetDerivedChargers", attrsDC, &dcs); err != nil {
if err := self.rals.Call("Responder.GetDerivedChargers", attrsDC, &dcs); err != nil {
utils.Logger.Err(fmt.Sprintf("Could not get derived charging for cgrid %s, error: %s", cdr.CGRID, err.Error()))
return nil, err
}
@@ -321,73 +374,6 @@ func (self *CdrServer) deriveCdrs(cdr *CDR) ([]*CDR, error) {
return cdrRuns, nil
}
func (self *CdrServer) rateStoreStatsReplicate(cdr *CDR, sendToStats bool) error {
if cdr.RunID == utils.MetaRaw { // Overwrite *raw with *default for rating
cdr.RunID = utils.META_DEFAULT
}
if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil {
return err
}
if err := LoadAlias(&AttrMatchingAlias{
Destination: cdr.Destination,
Direction: cdr.Direction,
Tenant: cdr.Tenant,
Category: cdr.Category,
Account: cdr.Account,
Subject: cdr.Subject,
Context: utils.ALIAS_CONTEXT_RATING,
}, cdr, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
return err
}
// Rate CDR, can receive multiple due to SMCosts for OriginIDPrefix
var ratedCDRs []*CDR
var err error
if cdr.Rated {
ratedCDRs = []*CDR{cdr}
} else if self.client != nil && !cdr.Rated {
if ratedCDRs, err = self.rateCDR(cdr); err != nil {
cdr.Cost = -1.0 // If there was an error, mark the CDR
cdr.ExtraInfo = err.Error()
ratedCDRs = []*CDR{cdr}
}
}
for _, ratedCDR := range ratedCDRs {
if ratedCDR.RunID == utils.META_SURETAX { // Request should be processed by SureTax
if err := SureTaxProcessCdr(ratedCDR); err != nil {
ratedCDR.Cost = -1.0
ratedCDR.ExtraInfo = err.Error() // Something failed, write the error in the ExtraInfo
}
}
}
if self.cgrCfg.CDRSStoreCdrs { // Store CDRs
for _, ratedCDR := range ratedCDRs {
// Store RatedCDR
if ratedCDR.CostDetails != nil {
ratedCDR.CostDetails.UpdateCost()
ratedCDR.CostDetails.UpdateRatedUsage()
}
if err := self.cdrDb.SetCDR(ratedCDR, true); err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> Storing rated CDR %+v, got error: %s", ratedCDR, err.Error()))
}
}
}
// Attach CDR to stats
if self.stats != nil && sendToStats { // Send CDR to stats
for _, ratedCDR := range ratedCDRs {
var out int
if err := self.stats.Call("CDRStatsV1.AppendCDR", ratedCDR, &out); err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> Could not send CDR to stats: %s", err.Error()))
}
}
}
if len(self.cgrCfg.CDRSCdrReplication) != 0 {
for _, ratedCDR := range ratedCDRs {
self.replicateCdr(ratedCDR)
}
}
return nil
}
// rateCDR will populate cost field
// Returns more than one rated CDR in case of SMCost retrieved based on prefix
func (self *CdrServer) rateCDR(cdr *CDR) ([]*CDR, error) {
@@ -462,9 +448,9 @@ func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) {
PerformRounding: true,
}
if utils.IsSliceMember([]string{utils.META_PSEUDOPREPAID, utils.META_POSTPAID, utils.META_PREPAID, utils.PSEUDOPREPAID, utils.POSTPAID, utils.PREPAID}, cdr.RequestType) { // Prepaid - Cost can be recalculated in case of missing records from SM
err = self.client.Call("Responder.Debit", cd, cc)
err = self.rals.Call("Responder.Debit", cd, cc)
} else {
err = self.client.Call("Responder.GetCost", cd, cc)
err = self.rals.Call("Responder.GetCost", cd, cc)
}
if err != nil {
return cc, err

View File

@@ -289,6 +289,7 @@ func (s *Stats) setupQueueSaver(sq *StatsQueue) {
func (s *Stats) AppendCDR(cdr *CDR, out *int) error {
s.mux.RLock()
defer s.mux.RUnlock()
utils.Logger.Debug(fmt.Sprintf("Stats.AppendCDR: %+v", cdr))
for _, sq := range s.queues {
sq.AppendCDR(cdr)
}