diff --git a/apier/v1/cdrstatsv1_local_test.go b/apier/v1/cdrstatsv1_local_test.go
index 9d6b82130..814b36d00 100644
--- a/apier/v1/cdrstatsv1_local_test.go
+++ b/apier/v1/cdrstatsv1_local_test.go
@@ -104,6 +104,13 @@ func TestCDRStatsLclGetQueueIds2(t *testing.T) {
} else if len(eQueueIds) != len(queueIds) {
t.Errorf("Expecting: %v, received: %v", eQueueIds, queueIds)
}
+ var rcvMetrics map[string]float64
+ expectedMetrics := map[string]float64{"ASR": -1, "ACD": -1}
+ if err := cdrstRpc.Call("CDRStatsV1.GetMetrics", AttrGetMetrics{StatsQueueId: "CDRST4"}, &rcvMetrics); err != nil {
+ t.Error("Calling CDRStatsV1.GetMetrics, got error: ", err.Error())
+ } else if !reflect.DeepEqual(expectedMetrics, rcvMetrics) {
+ t.Errorf("Expecting: %v, received: %v", expectedMetrics, rcvMetrics)
+ }
}
func TestCDRStatsLclPostCdrs(t *testing.T) {
@@ -112,28 +119,28 @@ func TestCDRStatsLclPostCdrs(t *testing.T) {
}
httpClient := new(http.Client)
storedCdrs := []*engine.CDR{
- &engine.CDR{CGRID: utils.Sha1("dsafdsafa", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsaf",
+ &engine.CDR{CGRID: utils.Sha1("dsafdsafa", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsafa",
OriginHost: "192.168.1.1", Source: "test",
RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(),
AnswerTime: time.Now(), RunID: utils.DEFAULT_RUNID, Usage: time.Duration(10) * time.Second,
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
},
- &engine.CDR{CGRID: utils.Sha1("dsafdsafb", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsaf",
+ &engine.CDR{CGRID: utils.Sha1("dsafdsafb", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsafb",
OriginHost: "192.168.1.1", Source: "test",
RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(),
AnswerTime: time.Now(), RunID: utils.DEFAULT_RUNID,
Usage: time.Duration(5) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
},
- &engine.CDR{CGRID: utils.Sha1("dsafdsafc", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsaf",
+ &engine.CDR{CGRID: utils.Sha1("dsafdsafc", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsafc",
OriginHost: "192.168.1.1", Source: "test",
RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(), AnswerTime: time.Now(),
RunID: utils.DEFAULT_RUNID,
Usage: time.Duration(30) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
},
- &engine.CDR{CGRID: utils.Sha1("dsafdsafd", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsaf",
+ &engine.CDR{CGRID: utils.Sha1("dsafdsafd", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsafd",
OriginHost: "192.168.1.1", Source: "test",
RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(), AnswerTime: time.Time{},
@@ -146,7 +153,7 @@ func TestCDRStatsLclPostCdrs(t *testing.T) {
t.Error(err.Error())
}
}
- time.Sleep(time.Duration(*waitRater) * time.Millisecond)
+ time.Sleep(time.Duration(1000) * time.Millisecond)
}
func TestCDRStatsLclGetMetrics1(t *testing.T) {
@@ -204,3 +211,12 @@ func TestCDRStatsLclResetMetrics(t *testing.T) {
t.Errorf("Expecting: %v, received: %v", expectedMetrics2, rcvMetrics2)
}
}
+
+func TestCDRStatsLclKillEngine(t *testing.T) {
+ if !*testLocal {
+ return
+ }
+ if err := engine.KillEngine(*waitRater); err != nil {
+ t.Error(err)
+ }
+}
diff --git a/cdrc/cdrc_local_test.go b/cdrc/cdrc_local_test.go
deleted file mode 100644
index 44832531f..000000000
--- a/cdrc/cdrc_local_test.go
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
-Rating system designed to be used in VoIP Carriers World
-Copyright (C) 2012-2015 ITsysCOM
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-
-package cdrc
-
-import (
- "errors"
- "flag"
- "fmt"
- "io/ioutil"
- "os"
- "os/exec"
- "path"
- "testing"
- "time"
-
- "github.com/cgrates/cgrates/config"
- "github.com/cgrates/cgrates/engine"
- "github.com/cgrates/cgrates/utils"
-)
-
-/*
-README:
-
- Enable local tests by passing '-local' to the go test command
- It is expected that the data folder of CGRateS exists at path /usr/share/cgrates/data or passed via command arguments.
- Prior running the tests, create database and users by running:
- mysql -pyourrootpwd < /usr/share/cgrates/data/storage/mysql/create_db_with_users.sql
- What these tests do:
- * Flush tables in storDb.
- * Start engine with default configuration and give it some time to listen (here caching can slow down).
- *
-*/
-
-var cfgPath string
-var cfg *config.CGRConfig
-var cdrcCfgs []*config.CdrcConfig
-var cdrcCfg *config.CdrcConfig
-
-var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args
-var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
-var storDbType = flag.String("stordb_type", "mysql", "The type of the storDb database ")
-var waitRater = flag.Int("wait_rater", 300, "Number of miliseconds to wait for rater to start and cache")
-
-var fileContent1 = `accid11,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
-accid12,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
-dummy_data
-accid13,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
-`
-
-var fileContent2 = `accid21,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
-accid22,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
-#accid1,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
-accid23,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1`
-
-var fileContent3 = `accid31;prepaid;out;cgrates.org;call;1001;1001;+4986517174963;2013-02-03 19:54:00;62;supplier1;172.16.1.1
-accid32;prepaid;out;cgrates.org;call;1001;1001;+4986517174963;2013-02-03 19:54:00;62;supplier1;172.16.1.1
-#accid1;prepaid;out;cgrates.org;call;1001;1001;+4986517174963;2013-02-03 19:54:00;62;supplier1;172.16.1.1
-accid33;prepaid;out;cgrates.org;call;1001;1001;+4986517174963;2013-02-03 19:54:00;62;supplier1;172.16.1.1`
-
-func startEngine() error {
- enginePath, err := exec.LookPath("cgr-engine")
- if err != nil {
- return errors.New("Cannot find cgr-engine executable")
- }
- stopEngine()
- engine := exec.Command(enginePath, "-config", cfgPath)
- if err := engine.Start(); err != nil {
- return fmt.Errorf("Cannot start cgr-engine: %s", err.Error())
- }
- time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time to rater to fire up
- return nil
-}
-
-func stopEngine() error {
- exec.Command("pkill", "cgr-engine").Run() // Just to make sure another one is not running, bit brutal maybe we can fine tune it
- return nil
-}
-
-// Need it here and not in init since Travis has no possibility to load local file
-func TestCsvLclLoadConfigt(*testing.T) {
- if !*testLocal {
- return
- }
- cfgPath = path.Join(*dataDir, "conf", "samples", "apier")
- cfg, _ = config.NewCGRConfigFromFolder(cfgPath)
- if len(cfg.CdrcProfiles) > 0 {
- cdrcCfgs = cfg.CdrcProfiles["/var/log/cgrates/cdrc/in"]
- }
-}
-
-func TestCsvLclEmptyTables(t *testing.T) {
- if !*testLocal {
- return
- }
- if *storDbType != utils.MYSQL {
- t.Fatal("Unsupported storDbType")
- }
- mysql, err := engine.NewMySQLStorage(cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.StorDBMaxOpenConns, cfg.StorDBMaxIdleConns)
- if err != nil {
- t.Fatal("Error on opening database connection: ", err)
- }
- for _, scriptName := range []string{utils.CREATE_CDRS_TABLES_SQL, utils.CREATE_TARIFFPLAN_TABLES_SQL} {
- if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", *storDbType, scriptName)); err != nil {
- t.Fatal("Error on mysql creation: ", err.Error())
- return // No point in going further
- }
- }
- if _, err := mysql.Db.Query(fmt.Sprintf("SELECT 1 from %s", utils.TBL_CDRS)); err != nil {
- t.Fatal(err.Error())
- }
-}
-
-// Creates cdr files and starts the engine
-func TestCsvLclCreateCdrFiles(t *testing.T) {
- if !*testLocal {
- return
- }
- if cdrcCfgs == nil {
- t.Fatal("Empty default cdrc configuration")
- }
- for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
- break
- }
- if err := os.RemoveAll(cdrcCfg.CdrInDir); err != nil {
- t.Fatal("Error removing folder: ", cdrcCfg.CdrInDir, err)
- }
- if err := os.MkdirAll(cdrcCfg.CdrInDir, 0755); err != nil {
- t.Fatal("Error creating folder: ", cdrcCfg.CdrInDir, err)
- }
- if err := os.RemoveAll(cdrcCfg.CdrOutDir); err != nil {
- t.Fatal("Error removing folder: ", cdrcCfg.CdrOutDir, err)
- }
- if err := os.MkdirAll(cdrcCfg.CdrOutDir, 0755); err != nil {
- t.Fatal("Error creating folder: ", cdrcCfg.CdrOutDir, err)
- }
- if err := ioutil.WriteFile(path.Join(cdrcCfg.CdrInDir, "file1.csv"), []byte(fileContent1), 0644); err != nil {
- t.Fatal(err.Error)
- }
- if err := ioutil.WriteFile(path.Join(cdrcCfg.CdrInDir, "file2.csv"), []byte(fileContent2), 0644); err != nil {
- t.Fatal(err.Error)
- }
-
-}
-
-func TestCsvLclProcessCdrDir(t *testing.T) {
- if !*testLocal {
- return
- }
- var cdrcCfg *config.CdrcConfig
- for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
- break
- }
- for _, cdrsConn := range cdrcCfg.CdrsConns {
- if cdrsConn.Address == utils.MetaInternal {
- cdrsConn.Address = "127.0.0.1:2013"
- }
- }
- if err := startEngine(); err != nil {
- t.Fatal(err.Error())
- }
- cdrc, err := NewCdrc(cdrcCfgs, true, nil, make(chan struct{}), "")
- if err != nil {
- t.Fatal(err.Error())
- }
- if err := cdrc.processCdrDir(); err != nil {
- t.Error(err)
- }
- stopEngine()
-}
-
-// Creates cdr files and starts the engine
-func TestCsvLclCreateCdr3File(t *testing.T) {
- if !*testLocal {
- return
- }
- if err := os.RemoveAll(cdrcCfg.CdrInDir); err != nil {
- t.Fatal("Error removing folder: ", cdrcCfg.CdrInDir, err)
- }
- if err := os.MkdirAll(cdrcCfg.CdrInDir, 0755); err != nil {
- t.Fatal("Error creating folder: ", cdrcCfg.CdrInDir, err)
- }
- if err := ioutil.WriteFile(path.Join(cdrcCfg.CdrInDir, "file3.csv"), []byte(fileContent3), 0644); err != nil {
- t.Fatal(err.Error)
- }
-}
-
-func TestCsvLclProcessCdr3Dir(t *testing.T) {
- if !*testLocal {
- return
- }
- for _, cdrsConn := range cdrcCfg.CdrsConns {
- if cdrsConn.Address == utils.MetaInternal {
- cdrsConn.Address = "127.0.0.1:2013"
- }
- }
- if err := startEngine(); err != nil {
- t.Fatal(err.Error())
- }
- cdrc, err := NewCdrc(cdrcCfgs, true, nil, make(chan struct{}), "")
- if err != nil {
- t.Fatal(err.Error())
- }
- if err := cdrc.processCdrDir(); err != nil {
- t.Error(err)
- }
- stopEngine()
-}
diff --git a/cdrc/csv_it_test.go b/cdrc/csv_it_test.go
new file mode 100644
index 000000000..ea2491e05
--- /dev/null
+++ b/cdrc/csv_it_test.go
@@ -0,0 +1,185 @@
+/*
+Rating system designed to be used in VoIP Carriers World
+Copyright (C) 2012-2015 ITsysCOM
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package cdrc
+
+import (
+ "flag"
+ "io/ioutil"
+ "net/rpc"
+ "net/rpc/jsonrpc"
+ "os"
+ "path"
+ "testing"
+ "time"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+)
+
+/*
+README:
+
+ Enable local tests by passing '-local' to the go test command
+ It is expected that the data folder of CGRateS exists at path /usr/share/cgrates/data or passed via command arguments.
+ Prior running the tests, create database and users by running:
+ mysql -pyourrootpwd < /usr/share/cgrates/data/storage/mysql/create_db_with_users.sql
+ What these tests do:
+ * Flush tables in storDb.
+ * Start engine with default configuration and give it some time to listen (here caching can slow down).
+ *
+*/
+
+var csvCfgPath string
+var csvCfg *config.CGRConfig
+var cdrcCfgs []*config.CdrcConfig
+var cdrcCfg *config.CdrcConfig
+var cdrcRpc *rpc.Client
+
+var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args
+var testIT = flag.Bool("integration", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args
+var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
+var waitRater = flag.Int("wait_rater", 300, "Number of miliseconds to wait for rater to start and cache")
+
+var fileContent1 = `dbafe9c8614c785a65aabd116dd3959c3c56f7f6,default,*voice,dsafdsaf,*rated,*out,cgrates.org,call,1001,1001,+4986517174963,2013-11-07 08:42:25 +0000 UTC,2013-11-07 08:42:26 +0000 UTC,10s,1.0100,val_extra3,"",val_extra1
+dbafe9c8614c785a65aabd116dd3959c3c56f7f7,default,*voice,dsafdsag,*rated,*out,cgrates.org,call,1001,1001,+4986517174964,2013-11-07 09:42:25 +0000 UTC,2013-11-07 09:42:26 +0000 UTC,20s,1.0100,val_extra3,"",val_extra1
+`
+
+var fileContent2 = `accid21;*prepaid;itsyscom.com;1001;086517174963;2013-02-03 19:54:00;62;val_extra3;"";val_extra1
+accid22;*postpaid;itsyscom.com;1001;+4986517174963;2013-02-03 19:54:00;123;val_extra3;"";val_extra1
+#accid1;*pseudoprepaid;itsyscom.com;1001;+4986517174963;2013-02-03 19:54:00;12;val_extra3;"";val_extra1
+accid23;*rated;cgrates.org;1001;086517174963;2013-02-03 19:54:00;26;val_extra3;"";val_extra1`
+
+func TestCsvITInitConfig(t *testing.T) {
+ if !*testIT {
+ return
+ }
+ var err error
+ csvCfgPath = path.Join(*dataDir, "conf", "samples", "cdrccsv")
+ if csvCfg, err = config.NewCGRConfigFromFolder(csvCfgPath); err != nil {
+ t.Fatal("Got config error: ", err.Error())
+ }
+}
+
+// InitDb so we can rely on count
+func TestCsvITInitCdrDb(t *testing.T) {
+ if !*testIT {
+ return
+ }
+ if err := engine.InitStorDb(csvCfg); err != nil {
+ t.Fatal(err)
+ }
+}
+
+/*
+func TestCsvITCreateCdrDirs(t *testing.T) {
+ if !*testIT {
+ return
+ }
+ for _, cdrcProfiles := range csvCfg.CdrcProfiles {
+ for _, cdrcInst := range cdrcProfiles {
+ for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} {
+ if err := os.RemoveAll(dir); err != nil {
+ t.Fatal("Error removing folder: ", dir, err)
+ }
+ if err := os.MkdirAll(dir, 0755); err != nil {
+ t.Fatal("Error creating folder: ", dir, err)
+ }
+ }
+ }
+ }
+}
+
+
+func TestCsvITStartEngine(t *testing.T) {
+ if !*testIT {
+ return
+ }
+ if _, err := engine.StopStartEngine(csvCfgPath, *waitRater); err != nil {
+ t.Fatal(err)
+ }
+}
+*/
+
+// Connect rpc client to rater
+func TestCsvITRpcConn(t *testing.T) {
+ if !*testIT {
+ return
+ }
+ var err error
+ cdrcRpc, err = jsonrpc.Dial("tcp", csvCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
+ if err != nil {
+ t.Fatal("Could not connect to rater: ", err.Error())
+ }
+}
+
+// The default scenario, out of cdrc defined in .cfg file
+func TestCsvITHandleCdr1File(t *testing.T) {
+ if !*testIT {
+ return
+ }
+ fileName := "file1.csv"
+ tmpFilePath := path.Join("/tmp", fileName)
+ if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent1), 0644); err != nil {
+ t.Fatal(err.Error)
+ }
+ if err := os.Rename(tmpFilePath, path.Join("/tmp/cdrctests/csvit1/in", fileName)); err != nil {
+ t.Fatal("Error moving file to processing directory: ", err)
+ }
+}
+
+// Scenario out of first .xml config
+func TestCsvITHandleCdr2File(t *testing.T) {
+ if !*testIT {
+ return
+ }
+ fileName := "file2.csv"
+ tmpFilePath := path.Join("/tmp", fileName)
+ if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent2), 0644); err != nil {
+ t.Fatal(err.Error)
+ }
+ if err := os.Rename(tmpFilePath, path.Join("/tmp/cdrctests/csvit2/in", fileName)); err != nil {
+ t.Fatal("Error moving file to processing directory: ", err)
+ }
+}
+
+func TestCsvITProcessedFiles(t *testing.T) {
+ if !*testIT {
+ return
+ }
+ time.Sleep(time.Duration(2**waitRater) * time.Millisecond)
+ if outContent1, err := ioutil.ReadFile("/tmp/cdrctests/csvit1/out/file1.csv"); err != nil {
+ t.Error(err)
+ } else if fileContent1 != string(outContent1) {
+ t.Errorf("Expecting: %q, received: %q", fileContent1, string(outContent1))
+ }
+ if outContent2, err := ioutil.ReadFile("/tmp/cdrctests/csvit2/out/file2.csv"); err != nil {
+ t.Error(err)
+ } else if fileContent2 != string(outContent2) {
+ t.Errorf("Expecting: %q, received: %q", fileContent1, string(outContent2))
+ }
+}
+
+func TestCsvITKillEngine(t *testing.T) {
+ if !*testIT {
+ return
+ }
+ if err := engine.KillEngine(*waitRater); err != nil {
+ t.Error(err)
+ }
+}
diff --git a/config/config.go b/config/config.go
index f9682ae58..016554415 100644
--- a/config/config.go
+++ b/config/config.go
@@ -853,7 +853,7 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir] = make([]*CdrcConfig, 0)
}
var cdrcInstCfg *CdrcConfig
- if *jsnCrc1Cfg.Id == utils.META_DEFAULT {
+ if *jsnCrc1Cfg.Id == utils.META_DEFAULT && self.dfltCdrcProfile == nil {
cdrcInstCfg = new(CdrcConfig)
} else {
cdrcInstCfg = self.dfltCdrcProfile.Clone() // Clone default so we do not inherit pointers
diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json
index b8fd14d00..b08f4d751 100644
--- a/data/conf/cgrates/cgrates.json
+++ b/data/conf/cgrates/cgrates.json
@@ -145,8 +145,9 @@
// },
-// "cdrc": {
-// "*default": {
+// "cdrc": [
+// {
+// "id": "*default", // identifier of the CDRC runner
// "enabled": false, // enable CDR client functionality
// "dry_run": false, // do not send the CDRs to CDRS, just parse them
// "cdrs_conns": [
@@ -181,8 +182,9 @@
// {"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "13", "mandatory": true},
// ],
// "trailer_fields": [], // template of the import trailer fields
-// }
-// },
+// },
+// ],
+
// "sm_generic": {
// "enabled": false, // starts SessionManager service:
diff --git a/data/conf/samples/cdrccsv/cgrates.json b/data/conf/samples/cdrccsv/cgrates.json
new file mode 100644
index 000000000..91c0ca30a
--- /dev/null
+++ b/data/conf/samples/cdrccsv/cgrates.json
@@ -0,0 +1,67 @@
+{
+
+// Real-time Charging System for Telecom & ISP environments
+// Copyright (C) ITsysCOM GmbH
+//
+// This file contains the default configuration hardcoded into CGRateS.
+// This is what you get when you load CGRateS with an empty configuration file.
+
+
+ "cdrs": {
+ "enabled": true,
+ "rals_conns": [], // no rating support, just *raw CDR testing
+},
+
+
+
+ "cdrc": [
+ {
+ "id": "*default",
+ "enabled": true,
+ "cdr_in_dir": "/tmp/cdrctests/csvit1/in",
+ "cdr_out_dir": "/tmp/cdrctests/csvit1/out",
+ "cdr_source_id": "csvit1",
+// "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
+// {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "2", "mandatory": true},
+// {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "3", "mandatory": true},
+// {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "4", "mandatory": true},
+// {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "5", "mandatory": true},
+// {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "6", "mandatory": true},
+// {"tag": "Category", "field_id": "Category", "type": "*composed", "value": "7", "mandatory": true},
+// {"tag": "Account", "field_id": "Account", "type": "*composed", "value": "8", "mandatory": true},
+// {"tag": "Subject", "field_id": "Subject", "type": "*composed", "value": "9", "mandatory": true},
+// {"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "10", "mandatory": true},
+// {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "11", "mandatory": true},
+// {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "12", "mandatory": true},
+// {"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "13", "mandatory": true},
+// ],
+ },
+ {
+ "id": "*CSVit2", // identifier of the CDRC runner
+ "enabled": true, // enable CDR client functionality
+ "field_separator": ";",
+ "cdr_in_dir": "/tmp/cdrctests/csvit2/in", // absolute path towards the directory where the CDRs are stored
+ "cdr_out_dir": "/tmp/cdrctests/csvit2/out", // absolute path towards the directory where processed CDRs will be moved
+ "cdr_source_id": "csvit2", // free form field, tag identifying the source of the CDRs within CDRS database
+ "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
+ {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*voice", "mandatory": true},
+ {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "0", "mandatory": true},
+ {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "1", "mandatory": true},
+ {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true},
+ {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "2", "mandatory": true},
+ {"tag": "Category", "field_id": "Category", "type": "*composed", "value": "^call", "mandatory": true},
+ {"tag": "Account", "field_id": "Account", "type": "*composed", "value": "3", "mandatory": true},
+ {"tag": "Subject", "field_id": "Subject", "type": "*composed", "value": "3", "mandatory": true},
+ {"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "~4:s/0([1-9]\\d+)/+49${1}/", "mandatory": true},
+ {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "5", "mandatory": true},
+ {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "5", "mandatory": true},
+ {"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "6", "mandatory": true},
+ {"tag": "HDRExtra3", "field_id": "HDRExtra3", "type": "*composed", "value": "6", "mandatory": true},
+ {"tag": "HDRExtra2", "field_id": "HDRExtra2", "type": "*composed", "value": "6", "mandatory": true},
+ {"tag": "HDRExtra1", "field_id": "HDRExtra1", "type": "*composed", "value": "6", "mandatory": true},
+ ],
+ },
+],
+
+
+}
\ No newline at end of file
diff --git a/data/tariffplans/cdrstats/CdrStats.csv b/data/tariffplans/cdrstats/CdrStats.csv
index bf60b63b4..e52e8c15c 100644
--- a/data/tariffplans/cdrstats/CdrStats.csv
+++ b/data/tariffplans/cdrstats/CdrStats.csv
@@ -1,6 +1,6 @@
#Id[0],QueueLength[1],TimeWindow[2],SaveInerval[3],Metric[4],SetupInterval[5],TOR[6],CdrHost[7],CdrSource[8],ReqType[9],Direction[10],Tenant[11],Category[12],Account[13],Subject[14],DestinationPrefix[15],PddInterval[16],UsageInterval[17],Supplier[18],DisconnectCause[19],MediationRunIds[20],RatedAccount[21],RatedSubject[22],CostInterval[23],Triggers[24]
-CDRST3,5,60m,,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,default,rif,rif,0;2,CDRST3_WARN_ASR
+CDRST3,5,60m,,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,*default,rif,rif,0;2,CDRST3_WARN_ASR
CDRST3,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACD
CDRST3,,,,ACC,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACC
-CDRST4,10,0,,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,CDRST4_WARN_ASR
+CDRST4,10,0,,ASR,,,,,,,cgrates.org,call,,,,,,,,*default,,,,CDRST4_WARN_ASR
CDRST4,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST4_WARN_ACD
diff --git a/engine/cdrs.go b/engine/cdrs.go
index dad06dd92..a8bfbd3c3 100644
--- a/engine/cdrs.go
+++ b/engine/cdrs.go
@@ -86,13 +86,13 @@ func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater, pubsub, use
if stats == nil || reflect.ValueOf(stats).IsNil() {
stats = nil
}
- return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, client: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: Guardian}, nil
+ return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rals: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: Guardian}, nil
}
type CdrServer struct {
cgrCfg *config.CGRConfig
cdrDb CdrStorage
- client rpcclient.RpcClientConnection
+ rals rpcclient.RpcClientConnection
pubsub rpcclient.RpcClientConnection
users rpcclient.RpcClientConnection
aliases rpcclient.RpcClientConnection
@@ -183,23 +183,7 @@ func (self *CdrServer) RateCDRs(cdrFltr *utils.CDRsFilter, sendToStats bool) err
return err
}
for _, cdr := range cdrs {
- // replace user profile fields
- if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil {
- return err
- }
- // replace aliases for cases they were loaded after CDR received
- if err := LoadAlias(&AttrMatchingAlias{
- Destination: cdr.Destination,
- Direction: cdr.Direction,
- Tenant: cdr.Tenant,
- Category: cdr.Category,
- Account: cdr.Account,
- Subject: cdr.Subject,
- Context: utils.ALIAS_CONTEXT_RATING,
- }, cdr, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
- return err
- }
- if err := self.rateStoreStatsReplicate(cdr, sendToStats); err != nil {
+ if err := self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, sendToStats, len(self.cgrCfg.CDRSCdrReplication) != 0); err != nil {
utils.Logger.Err(fmt.Sprintf(" Processing CDR %+v, got error: %s", cdr, err.Error()))
}
}
@@ -223,7 +207,7 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) {
if cdr.Subject == "" { // Use account information as rating subject if missing
cdr.Subject = cdr.Account
}
- if !cdr.Rated {
+ if !cdr.Rated { // Enforce the RunID if CDR is not rated
cdr.RunID = utils.MetaRaw
}
if self.cgrCfg.CDRSStoreCdrs { // Store RawCDRs, this we do sync so we can reply with the status
@@ -231,24 +215,92 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) {
cdr.CostDetails.UpdateCost()
cdr.CostDetails.UpdateRatedUsage()
}
- if err := self.cdrDb.SetCDR(cdr, false); err != nil { // Only original CDR stored in primary table, no derived
+ if err := self.cdrDb.SetCDR(cdr, false); err != nil {
utils.Logger.Err(fmt.Sprintf(" Storing primary CDR %+v, got error: %s", cdr, err.Error()))
return err // Error is propagated back and we don't continue processing the CDR if we cannot store it
}
}
- go self.deriveRateStoreStatsReplicate(cdr)
+ // Attach raw CDR to stats
+ if self.stats != nil { // Send raw CDR to stats
+ var out int
+ go self.stats.Call("CDRStatsV1.AppendCDR", cdr, &out)
+ }
+ if len(self.cgrCfg.CDRSCdrReplication) != 0 { // Replicate raw CDR
+ go self.replicateCdr(cdr)
+ }
+
+ if self.rals != nil && !cdr.Rated { // CDRs not rated will be processed by Rating
+ go self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, self.stats != nil, len(self.cgrCfg.CDRSCdrReplication) != 0)
+ }
return nil
}
// Returns error if not able to properly store the CDR, mediation is async since we can always recover offline
-func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR) error {
+func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR, store, stats, replicate bool) error {
cdrRuns, err := self.deriveCdrs(cdr)
if err != nil {
return err
}
+ var ratedCDRs []*CDR // Gather all CDRs received from rating subsystem
for _, cdrRun := range cdrRuns {
- if err := self.rateStoreStatsReplicate(cdrRun, true); err != nil {
- return err
+ utils.Logger.Debug(fmt.Sprintf("Processing CDR run: %+v", cdrRun))
+ if err := LoadUserProfile(cdrRun, utils.EXTRA_FIELDS); err != nil {
+ utils.Logger.Err(fmt.Sprintf(" UserS handling for CDR %+v, got error: %s", cdrRun, err.Error()))
+ continue
+ }
+ if err := LoadAlias(&AttrMatchingAlias{
+ Destination: cdrRun.Destination,
+ Direction: cdrRun.Direction,
+ Tenant: cdrRun.Tenant,
+ Category: cdrRun.Category,
+ Account: cdrRun.Account,
+ Subject: cdrRun.Subject,
+ Context: utils.ALIAS_CONTEXT_RATING,
+ }, cdrRun, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
+ utils.Logger.Err(fmt.Sprintf(" Aliasing CDR %+v, got error: %s", cdrRun, err.Error()))
+ continue
+ }
+ rcvRatedCDRs, err := self.rateCDR(cdrRun)
+ if err != nil {
+ cdrRun.Cost = -1.0 // If there was an error, mark the CDR
+ cdrRun.ExtraInfo = err.Error()
+ rcvRatedCDRs = []*CDR{cdrRun}
+ }
+ ratedCDRs = append(ratedCDRs, rcvRatedCDRs...)
+ }
+ // Request should be processed by SureTax
+ for _, ratedCDR := range ratedCDRs {
+ if ratedCDR.RunID == utils.META_SURETAX {
+ if err := SureTaxProcessCdr(ratedCDR); err != nil {
+ ratedCDR.Cost = -1.0
+ ratedCDR.ExtraInfo = err.Error() // Something failed, write the error in the ExtraInfo
+ }
+ }
+ }
+ // Store rated CDRs
+ if store {
+ for _, ratedCDR := range ratedCDRs {
+ if ratedCDR.CostDetails != nil {
+ ratedCDR.CostDetails.UpdateCost()
+ ratedCDR.CostDetails.UpdateRatedUsage()
+ }
+ if err := self.cdrDb.SetCDR(ratedCDR, true); err != nil {
+ utils.Logger.Err(fmt.Sprintf(" Storing rated CDR %+v, got error: %s", ratedCDR, err.Error()))
+ }
+ }
+ }
+ // Attach CDR to stats
+ if stats { // Send CDR to stats
+ for _, ratedCDR := range ratedCDRs {
+ var out int
+ if err := self.stats.Call("CDRStatsV1.AppendCDR", ratedCDR, &out); err != nil {
+ utils.Logger.Err(fmt.Sprintf(" Could not send CDR to stats: %s", err.Error()))
+ }
+ }
+ }
+ if replicate {
+ for _, ratedCDR := range ratedCDRs {
+ self.replicateCdr(ratedCDR)
}
}
return nil
@@ -259,6 +311,7 @@ func (self *CdrServer) deriveCdrs(cdr *CDR) ([]*CDR, error) {
if cdr.RunID != utils.MetaRaw { // Only derive *raw CDRs
return cdrRuns, nil
}
+ cdr.RunID = utils.META_DEFAULT // Rewrite *raw with *default since we have it as first run
if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil {
return nil, err
}
@@ -276,7 +329,7 @@ func (self *CdrServer) deriveCdrs(cdr *CDR) ([]*CDR, error) {
attrsDC := &utils.AttrDerivedChargers{Tenant: cdr.Tenant, Category: cdr.Category, Direction: cdr.Direction,
Account: cdr.Account, Subject: cdr.Subject, Destination: cdr.Destination}
var dcs utils.DerivedChargers
- if err := self.client.Call("Responder.GetDerivedChargers", attrsDC, &dcs); err != nil {
+ if err := self.rals.Call("Responder.GetDerivedChargers", attrsDC, &dcs); err != nil {
utils.Logger.Err(fmt.Sprintf("Could not get derived charging for cgrid %s, error: %s", cdr.CGRID, err.Error()))
return nil, err
}
@@ -321,73 +374,6 @@ func (self *CdrServer) deriveCdrs(cdr *CDR) ([]*CDR, error) {
return cdrRuns, nil
}
-func (self *CdrServer) rateStoreStatsReplicate(cdr *CDR, sendToStats bool) error {
- if cdr.RunID == utils.MetaRaw { // Overwrite *raw with *default for rating
- cdr.RunID = utils.META_DEFAULT
- }
- if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil {
- return err
- }
- if err := LoadAlias(&AttrMatchingAlias{
- Destination: cdr.Destination,
- Direction: cdr.Direction,
- Tenant: cdr.Tenant,
- Category: cdr.Category,
- Account: cdr.Account,
- Subject: cdr.Subject,
- Context: utils.ALIAS_CONTEXT_RATING,
- }, cdr, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
- return err
- }
- // Rate CDR, can receive multiple due to SMCosts for OriginIDPrefix
- var ratedCDRs []*CDR
- var err error
- if cdr.Rated {
- ratedCDRs = []*CDR{cdr}
- } else if self.client != nil && !cdr.Rated {
- if ratedCDRs, err = self.rateCDR(cdr); err != nil {
- cdr.Cost = -1.0 // If there was an error, mark the CDR
- cdr.ExtraInfo = err.Error()
- ratedCDRs = []*CDR{cdr}
- }
- }
- for _, ratedCDR := range ratedCDRs {
- if ratedCDR.RunID == utils.META_SURETAX { // Request should be processed by SureTax
- if err := SureTaxProcessCdr(ratedCDR); err != nil {
- ratedCDR.Cost = -1.0
- ratedCDR.ExtraInfo = err.Error() // Something failed, write the error in the ExtraInfo
- }
- }
- }
- if self.cgrCfg.CDRSStoreCdrs { // Store CDRs
- for _, ratedCDR := range ratedCDRs {
- // Store RatedCDR
- if ratedCDR.CostDetails != nil {
- ratedCDR.CostDetails.UpdateCost()
- ratedCDR.CostDetails.UpdateRatedUsage()
- }
- if err := self.cdrDb.SetCDR(ratedCDR, true); err != nil {
- utils.Logger.Err(fmt.Sprintf(" Storing rated CDR %+v, got error: %s", ratedCDR, err.Error()))
- }
- }
- }
- // Attach CDR to stats
- if self.stats != nil && sendToStats { // Send CDR to stats
- for _, ratedCDR := range ratedCDRs {
- var out int
- if err := self.stats.Call("CDRStatsV1.AppendCDR", ratedCDR, &out); err != nil {
- utils.Logger.Err(fmt.Sprintf(" Could not send CDR to stats: %s", err.Error()))
- }
- }
- }
- if len(self.cgrCfg.CDRSCdrReplication) != 0 {
- for _, ratedCDR := range ratedCDRs {
- self.replicateCdr(ratedCDR)
- }
- }
- return nil
-}
-
// rateCDR will populate cost field
// Returns more than one rated CDR in case of SMCost retrieved based on prefix
func (self *CdrServer) rateCDR(cdr *CDR) ([]*CDR, error) {
@@ -462,9 +448,9 @@ func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) {
PerformRounding: true,
}
if utils.IsSliceMember([]string{utils.META_PSEUDOPREPAID, utils.META_POSTPAID, utils.META_PREPAID, utils.PSEUDOPREPAID, utils.POSTPAID, utils.PREPAID}, cdr.RequestType) { // Prepaid - Cost can be recalculated in case of missing records from SM
- err = self.client.Call("Responder.Debit", cd, cc)
+ err = self.rals.Call("Responder.Debit", cd, cc)
} else {
- err = self.client.Call("Responder.GetCost", cd, cc)
+ err = self.rals.Call("Responder.GetCost", cd, cc)
}
if err != nil {
return cc, err
diff --git a/engine/stats.go b/engine/stats.go
index 571615ec6..b84fda549 100644
--- a/engine/stats.go
+++ b/engine/stats.go
@@ -289,6 +289,7 @@ func (s *Stats) setupQueueSaver(sq *StatsQueue) {
func (s *Stats) AppendCDR(cdr *CDR, out *int) error {
s.mux.RLock()
defer s.mux.RUnlock()
+ utils.Logger.Debug(fmt.Sprintf("Stats.AppendCDR: %+v", cdr))
for _, sq := range s.queues {
sq.AppendCDR(cdr)
}