From 4fb9a641eec956c7c1e4387f1a423ba2ec2767a2 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 1 May 2016 17:06:25 +0200 Subject: [PATCH] Refactored CDRs internal functions, better CDRC .csv testing --- apier/v1/cdrstatsv1_local_test.go | 26 ++- cdrc/cdrc_local_test.go | 223 ------------------------- cdrc/csv_it_test.go | 185 ++++++++++++++++++++ config/config.go | 2 +- data/conf/cgrates/cgrates.json | 10 +- data/conf/samples/cdrccsv/cgrates.json | 67 ++++++++ data/tariffplans/cdrstats/CdrStats.csv | 4 +- engine/cdrs.go | 176 +++++++++---------- engine/stats.go | 1 + 9 files changed, 364 insertions(+), 330 deletions(-) delete mode 100644 cdrc/cdrc_local_test.go create mode 100644 cdrc/csv_it_test.go create mode 100644 data/conf/samples/cdrccsv/cgrates.json diff --git a/apier/v1/cdrstatsv1_local_test.go b/apier/v1/cdrstatsv1_local_test.go index 9d6b82130..814b36d00 100644 --- a/apier/v1/cdrstatsv1_local_test.go +++ b/apier/v1/cdrstatsv1_local_test.go @@ -104,6 +104,13 @@ func TestCDRStatsLclGetQueueIds2(t *testing.T) { } else if len(eQueueIds) != len(queueIds) { t.Errorf("Expecting: %v, received: %v", eQueueIds, queueIds) } + var rcvMetrics map[string]float64 + expectedMetrics := map[string]float64{"ASR": -1, "ACD": -1} + if err := cdrstRpc.Call("CDRStatsV1.GetMetrics", AttrGetMetrics{StatsQueueId: "CDRST4"}, &rcvMetrics); err != nil { + t.Error("Calling CDRStatsV1.GetMetrics, got error: ", err.Error()) + } else if !reflect.DeepEqual(expectedMetrics, rcvMetrics) { + t.Errorf("Expecting: %v, received: %v", expectedMetrics, rcvMetrics) + } } func TestCDRStatsLclPostCdrs(t *testing.T) { @@ -112,28 +119,28 @@ func TestCDRStatsLclPostCdrs(t *testing.T) { } httpClient := new(http.Client) storedCdrs := []*engine.CDR{ - &engine.CDR{CGRID: utils.Sha1("dsafdsafa", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsaf", + &engine.CDR{CGRID: utils.Sha1("dsafdsafa", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsafa", OriginHost: "192.168.1.1", Source: "test", RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(), AnswerTime: time.Now(), RunID: utils.DEFAULT_RUNID, Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, }, - &engine.CDR{CGRID: utils.Sha1("dsafdsafb", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsaf", + &engine.CDR{CGRID: utils.Sha1("dsafdsafb", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsafb", OriginHost: "192.168.1.1", Source: "test", RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(), AnswerTime: time.Now(), RunID: utils.DEFAULT_RUNID, Usage: time.Duration(5) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, }, - &engine.CDR{CGRID: utils.Sha1("dsafdsafc", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsaf", + &engine.CDR{CGRID: utils.Sha1("dsafdsafc", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsafc", OriginHost: "192.168.1.1", Source: "test", RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(), AnswerTime: time.Now(), RunID: utils.DEFAULT_RUNID, Usage: time.Duration(30) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, }, - &engine.CDR{CGRID: utils.Sha1("dsafdsafd", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsaf", + &engine.CDR{CGRID: utils.Sha1("dsafdsafd", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsafd", OriginHost: "192.168.1.1", Source: "test", RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(), AnswerTime: time.Time{}, @@ -146,7 +153,7 @@ func TestCDRStatsLclPostCdrs(t *testing.T) { t.Error(err.Error()) } } - time.Sleep(time.Duration(*waitRater) * time.Millisecond) + time.Sleep(time.Duration(1000) * time.Millisecond) } func TestCDRStatsLclGetMetrics1(t *testing.T) { @@ -204,3 +211,12 @@ func TestCDRStatsLclResetMetrics(t *testing.T) { t.Errorf("Expecting: %v, received: %v", expectedMetrics2, rcvMetrics2) } } + +func TestCDRStatsLclKillEngine(t *testing.T) { + if !*testLocal { + return + } + if err := engine.KillEngine(*waitRater); err != nil { + t.Error(err) + } +} diff --git a/cdrc/cdrc_local_test.go b/cdrc/cdrc_local_test.go deleted file mode 100644 index 44832531f..000000000 --- a/cdrc/cdrc_local_test.go +++ /dev/null @@ -1,223 +0,0 @@ -/* -Rating system designed to be used in VoIP Carriers World -Copyright (C) 2012-2015 ITsysCOM - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package cdrc - -import ( - "errors" - "flag" - "fmt" - "io/ioutil" - "os" - "os/exec" - "path" - "testing" - "time" - - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -/* -README: - - Enable local tests by passing '-local' to the go test command - It is expected that the data folder of CGRateS exists at path /usr/share/cgrates/data or passed via command arguments. - Prior running the tests, create database and users by running: - mysql -pyourrootpwd < /usr/share/cgrates/data/storage/mysql/create_db_with_users.sql - What these tests do: - * Flush tables in storDb. - * Start engine with default configuration and give it some time to listen (here caching can slow down). - * -*/ - -var cfgPath string -var cfg *config.CGRConfig -var cdrcCfgs []*config.CdrcConfig -var cdrcCfg *config.CdrcConfig - -var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args -var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") -var storDbType = flag.String("stordb_type", "mysql", "The type of the storDb database ") -var waitRater = flag.Int("wait_rater", 300, "Number of miliseconds to wait for rater to start and cache") - -var fileContent1 = `accid11,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1 -accid12,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1 -dummy_data -accid13,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1 -` - -var fileContent2 = `accid21,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1 -accid22,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1 -#accid1,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1 -accid23,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1` - -var fileContent3 = `accid31;prepaid;out;cgrates.org;call;1001;1001;+4986517174963;2013-02-03 19:54:00;62;supplier1;172.16.1.1 -accid32;prepaid;out;cgrates.org;call;1001;1001;+4986517174963;2013-02-03 19:54:00;62;supplier1;172.16.1.1 -#accid1;prepaid;out;cgrates.org;call;1001;1001;+4986517174963;2013-02-03 19:54:00;62;supplier1;172.16.1.1 -accid33;prepaid;out;cgrates.org;call;1001;1001;+4986517174963;2013-02-03 19:54:00;62;supplier1;172.16.1.1` - -func startEngine() error { - enginePath, err := exec.LookPath("cgr-engine") - if err != nil { - return errors.New("Cannot find cgr-engine executable") - } - stopEngine() - engine := exec.Command(enginePath, "-config", cfgPath) - if err := engine.Start(); err != nil { - return fmt.Errorf("Cannot start cgr-engine: %s", err.Error()) - } - time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time to rater to fire up - return nil -} - -func stopEngine() error { - exec.Command("pkill", "cgr-engine").Run() // Just to make sure another one is not running, bit brutal maybe we can fine tune it - return nil -} - -// Need it here and not in init since Travis has no possibility to load local file -func TestCsvLclLoadConfigt(*testing.T) { - if !*testLocal { - return - } - cfgPath = path.Join(*dataDir, "conf", "samples", "apier") - cfg, _ = config.NewCGRConfigFromFolder(cfgPath) - if len(cfg.CdrcProfiles) > 0 { - cdrcCfgs = cfg.CdrcProfiles["/var/log/cgrates/cdrc/in"] - } -} - -func TestCsvLclEmptyTables(t *testing.T) { - if !*testLocal { - return - } - if *storDbType != utils.MYSQL { - t.Fatal("Unsupported storDbType") - } - mysql, err := engine.NewMySQLStorage(cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.StorDBMaxOpenConns, cfg.StorDBMaxIdleConns) - if err != nil { - t.Fatal("Error on opening database connection: ", err) - } - for _, scriptName := range []string{utils.CREATE_CDRS_TABLES_SQL, utils.CREATE_TARIFFPLAN_TABLES_SQL} { - if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", *storDbType, scriptName)); err != nil { - t.Fatal("Error on mysql creation: ", err.Error()) - return // No point in going further - } - } - if _, err := mysql.Db.Query(fmt.Sprintf("SELECT 1 from %s", utils.TBL_CDRS)); err != nil { - t.Fatal(err.Error()) - } -} - -// Creates cdr files and starts the engine -func TestCsvLclCreateCdrFiles(t *testing.T) { - if !*testLocal { - return - } - if cdrcCfgs == nil { - t.Fatal("Empty default cdrc configuration") - } - for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one - break - } - if err := os.RemoveAll(cdrcCfg.CdrInDir); err != nil { - t.Fatal("Error removing folder: ", cdrcCfg.CdrInDir, err) - } - if err := os.MkdirAll(cdrcCfg.CdrInDir, 0755); err != nil { - t.Fatal("Error creating folder: ", cdrcCfg.CdrInDir, err) - } - if err := os.RemoveAll(cdrcCfg.CdrOutDir); err != nil { - t.Fatal("Error removing folder: ", cdrcCfg.CdrOutDir, err) - } - if err := os.MkdirAll(cdrcCfg.CdrOutDir, 0755); err != nil { - t.Fatal("Error creating folder: ", cdrcCfg.CdrOutDir, err) - } - if err := ioutil.WriteFile(path.Join(cdrcCfg.CdrInDir, "file1.csv"), []byte(fileContent1), 0644); err != nil { - t.Fatal(err.Error) - } - if err := ioutil.WriteFile(path.Join(cdrcCfg.CdrInDir, "file2.csv"), []byte(fileContent2), 0644); err != nil { - t.Fatal(err.Error) - } - -} - -func TestCsvLclProcessCdrDir(t *testing.T) { - if !*testLocal { - return - } - var cdrcCfg *config.CdrcConfig - for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one - break - } - for _, cdrsConn := range cdrcCfg.CdrsConns { - if cdrsConn.Address == utils.MetaInternal { - cdrsConn.Address = "127.0.0.1:2013" - } - } - if err := startEngine(); err != nil { - t.Fatal(err.Error()) - } - cdrc, err := NewCdrc(cdrcCfgs, true, nil, make(chan struct{}), "") - if err != nil { - t.Fatal(err.Error()) - } - if err := cdrc.processCdrDir(); err != nil { - t.Error(err) - } - stopEngine() -} - -// Creates cdr files and starts the engine -func TestCsvLclCreateCdr3File(t *testing.T) { - if !*testLocal { - return - } - if err := os.RemoveAll(cdrcCfg.CdrInDir); err != nil { - t.Fatal("Error removing folder: ", cdrcCfg.CdrInDir, err) - } - if err := os.MkdirAll(cdrcCfg.CdrInDir, 0755); err != nil { - t.Fatal("Error creating folder: ", cdrcCfg.CdrInDir, err) - } - if err := ioutil.WriteFile(path.Join(cdrcCfg.CdrInDir, "file3.csv"), []byte(fileContent3), 0644); err != nil { - t.Fatal(err.Error) - } -} - -func TestCsvLclProcessCdr3Dir(t *testing.T) { - if !*testLocal { - return - } - for _, cdrsConn := range cdrcCfg.CdrsConns { - if cdrsConn.Address == utils.MetaInternal { - cdrsConn.Address = "127.0.0.1:2013" - } - } - if err := startEngine(); err != nil { - t.Fatal(err.Error()) - } - cdrc, err := NewCdrc(cdrcCfgs, true, nil, make(chan struct{}), "") - if err != nil { - t.Fatal(err.Error()) - } - if err := cdrc.processCdrDir(); err != nil { - t.Error(err) - } - stopEngine() -} diff --git a/cdrc/csv_it_test.go b/cdrc/csv_it_test.go new file mode 100644 index 000000000..ea2491e05 --- /dev/null +++ b/cdrc/csv_it_test.go @@ -0,0 +1,185 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2012-2015 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package cdrc + +import ( + "flag" + "io/ioutil" + "net/rpc" + "net/rpc/jsonrpc" + "os" + "path" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" +) + +/* +README: + + Enable local tests by passing '-local' to the go test command + It is expected that the data folder of CGRateS exists at path /usr/share/cgrates/data or passed via command arguments. + Prior running the tests, create database and users by running: + mysql -pyourrootpwd < /usr/share/cgrates/data/storage/mysql/create_db_with_users.sql + What these tests do: + * Flush tables in storDb. + * Start engine with default configuration and give it some time to listen (here caching can slow down). + * +*/ + +var csvCfgPath string +var csvCfg *config.CGRConfig +var cdrcCfgs []*config.CdrcConfig +var cdrcCfg *config.CdrcConfig +var cdrcRpc *rpc.Client + +var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args +var testIT = flag.Bool("integration", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args +var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") +var waitRater = flag.Int("wait_rater", 300, "Number of miliseconds to wait for rater to start and cache") + +var fileContent1 = `dbafe9c8614c785a65aabd116dd3959c3c56f7f6,default,*voice,dsafdsaf,*rated,*out,cgrates.org,call,1001,1001,+4986517174963,2013-11-07 08:42:25 +0000 UTC,2013-11-07 08:42:26 +0000 UTC,10s,1.0100,val_extra3,"",val_extra1 +dbafe9c8614c785a65aabd116dd3959c3c56f7f7,default,*voice,dsafdsag,*rated,*out,cgrates.org,call,1001,1001,+4986517174964,2013-11-07 09:42:25 +0000 UTC,2013-11-07 09:42:26 +0000 UTC,20s,1.0100,val_extra3,"",val_extra1 +` + +var fileContent2 = `accid21;*prepaid;itsyscom.com;1001;086517174963;2013-02-03 19:54:00;62;val_extra3;"";val_extra1 +accid22;*postpaid;itsyscom.com;1001;+4986517174963;2013-02-03 19:54:00;123;val_extra3;"";val_extra1 +#accid1;*pseudoprepaid;itsyscom.com;1001;+4986517174963;2013-02-03 19:54:00;12;val_extra3;"";val_extra1 +accid23;*rated;cgrates.org;1001;086517174963;2013-02-03 19:54:00;26;val_extra3;"";val_extra1` + +func TestCsvITInitConfig(t *testing.T) { + if !*testIT { + return + } + var err error + csvCfgPath = path.Join(*dataDir, "conf", "samples", "cdrccsv") + if csvCfg, err = config.NewCGRConfigFromFolder(csvCfgPath); err != nil { + t.Fatal("Got config error: ", err.Error()) + } +} + +// InitDb so we can rely on count +func TestCsvITInitCdrDb(t *testing.T) { + if !*testIT { + return + } + if err := engine.InitStorDb(csvCfg); err != nil { + t.Fatal(err) + } +} + +/* +func TestCsvITCreateCdrDirs(t *testing.T) { + if !*testIT { + return + } + for _, cdrcProfiles := range csvCfg.CdrcProfiles { + for _, cdrcInst := range cdrcProfiles { + for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + if err := os.MkdirAll(dir, 0755); err != nil { + t.Fatal("Error creating folder: ", dir, err) + } + } + } + } +} + + +func TestCsvITStartEngine(t *testing.T) { + if !*testIT { + return + } + if _, err := engine.StopStartEngine(csvCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} +*/ + +// Connect rpc client to rater +func TestCsvITRpcConn(t *testing.T) { + if !*testIT { + return + } + var err error + cdrcRpc, err = jsonrpc.Dial("tcp", csvCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +// The default scenario, out of cdrc defined in .cfg file +func TestCsvITHandleCdr1File(t *testing.T) { + if !*testIT { + return + } + fileName := "file1.csv" + tmpFilePath := path.Join("/tmp", fileName) + if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent1), 0644); err != nil { + t.Fatal(err.Error) + } + if err := os.Rename(tmpFilePath, path.Join("/tmp/cdrctests/csvit1/in", fileName)); err != nil { + t.Fatal("Error moving file to processing directory: ", err) + } +} + +// Scenario out of first .xml config +func TestCsvITHandleCdr2File(t *testing.T) { + if !*testIT { + return + } + fileName := "file2.csv" + tmpFilePath := path.Join("/tmp", fileName) + if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent2), 0644); err != nil { + t.Fatal(err.Error) + } + if err := os.Rename(tmpFilePath, path.Join("/tmp/cdrctests/csvit2/in", fileName)); err != nil { + t.Fatal("Error moving file to processing directory: ", err) + } +} + +func TestCsvITProcessedFiles(t *testing.T) { + if !*testIT { + return + } + time.Sleep(time.Duration(2**waitRater) * time.Millisecond) + if outContent1, err := ioutil.ReadFile("/tmp/cdrctests/csvit1/out/file1.csv"); err != nil { + t.Error(err) + } else if fileContent1 != string(outContent1) { + t.Errorf("Expecting: %q, received: %q", fileContent1, string(outContent1)) + } + if outContent2, err := ioutil.ReadFile("/tmp/cdrctests/csvit2/out/file2.csv"); err != nil { + t.Error(err) + } else if fileContent2 != string(outContent2) { + t.Errorf("Expecting: %q, received: %q", fileContent1, string(outContent2)) + } +} + +func TestCsvITKillEngine(t *testing.T) { + if !*testIT { + return + } + if err := engine.KillEngine(*waitRater); err != nil { + t.Error(err) + } +} diff --git a/config/config.go b/config/config.go index f9682ae58..016554415 100644 --- a/config/config.go +++ b/config/config.go @@ -853,7 +853,7 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir] = make([]*CdrcConfig, 0) } var cdrcInstCfg *CdrcConfig - if *jsnCrc1Cfg.Id == utils.META_DEFAULT { + if *jsnCrc1Cfg.Id == utils.META_DEFAULT && self.dfltCdrcProfile == nil { cdrcInstCfg = new(CdrcConfig) } else { cdrcInstCfg = self.dfltCdrcProfile.Clone() // Clone default so we do not inherit pointers diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index b8fd14d00..b08f4d751 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -145,8 +145,9 @@ // }, -// "cdrc": { -// "*default": { +// "cdrc": [ +// { +// "id": "*default", // identifier of the CDRC runner // "enabled": false, // enable CDR client functionality // "dry_run": false, // do not send the CDRs to CDRS, just parse them // "cdrs_conns": [ @@ -181,8 +182,9 @@ // {"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "13", "mandatory": true}, // ], // "trailer_fields": [], // template of the import trailer fields -// } -// }, +// }, +// ], + // "sm_generic": { // "enabled": false, // starts SessionManager service: diff --git a/data/conf/samples/cdrccsv/cgrates.json b/data/conf/samples/cdrccsv/cgrates.json new file mode 100644 index 000000000..91c0ca30a --- /dev/null +++ b/data/conf/samples/cdrccsv/cgrates.json @@ -0,0 +1,67 @@ +{ + +// Real-time Charging System for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +// +// This file contains the default configuration hardcoded into CGRateS. +// This is what you get when you load CGRateS with an empty configuration file. + + + "cdrs": { + "enabled": true, + "rals_conns": [], // no rating support, just *raw CDR testing +}, + + + + "cdrc": [ + { + "id": "*default", + "enabled": true, + "cdr_in_dir": "/tmp/cdrctests/csvit1/in", + "cdr_out_dir": "/tmp/cdrctests/csvit1/out", + "cdr_source_id": "csvit1", +// "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value +// {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "2", "mandatory": true}, +// {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "3", "mandatory": true}, +// {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "4", "mandatory": true}, +// {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "5", "mandatory": true}, +// {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "6", "mandatory": true}, +// {"tag": "Category", "field_id": "Category", "type": "*composed", "value": "7", "mandatory": true}, +// {"tag": "Account", "field_id": "Account", "type": "*composed", "value": "8", "mandatory": true}, +// {"tag": "Subject", "field_id": "Subject", "type": "*composed", "value": "9", "mandatory": true}, +// {"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "10", "mandatory": true}, +// {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "11", "mandatory": true}, +// {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "12", "mandatory": true}, +// {"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "13", "mandatory": true}, +// ], + }, + { + "id": "*CSVit2", // identifier of the CDRC runner + "enabled": true, // enable CDR client functionality + "field_separator": ";", + "cdr_in_dir": "/tmp/cdrctests/csvit2/in", // absolute path towards the directory where the CDRs are stored + "cdr_out_dir": "/tmp/cdrctests/csvit2/out", // absolute path towards the directory where processed CDRs will be moved + "cdr_source_id": "csvit2", // free form field, tag identifying the source of the CDRs within CDRS database + "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*voice", "mandatory": true}, + {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "0", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "1", "mandatory": true}, + {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "2", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*composed", "value": "^call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", "value": "3", "mandatory": true}, + {"tag": "Subject", "field_id": "Subject", "type": "*composed", "value": "3", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "~4:s/0([1-9]\\d+)/+49${1}/", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "5", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "5", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "6", "mandatory": true}, + {"tag": "HDRExtra3", "field_id": "HDRExtra3", "type": "*composed", "value": "6", "mandatory": true}, + {"tag": "HDRExtra2", "field_id": "HDRExtra2", "type": "*composed", "value": "6", "mandatory": true}, + {"tag": "HDRExtra1", "field_id": "HDRExtra1", "type": "*composed", "value": "6", "mandatory": true}, + ], + }, +], + + +} \ No newline at end of file diff --git a/data/tariffplans/cdrstats/CdrStats.csv b/data/tariffplans/cdrstats/CdrStats.csv index bf60b63b4..e52e8c15c 100644 --- a/data/tariffplans/cdrstats/CdrStats.csv +++ b/data/tariffplans/cdrstats/CdrStats.csv @@ -1,6 +1,6 @@ #Id[0],QueueLength[1],TimeWindow[2],SaveInerval[3],Metric[4],SetupInterval[5],TOR[6],CdrHost[7],CdrSource[8],ReqType[9],Direction[10],Tenant[11],Category[12],Account[13],Subject[14],DestinationPrefix[15],PddInterval[16],UsageInterval[17],Supplier[18],DisconnectCause[19],MediationRunIds[20],RatedAccount[21],RatedSubject[22],CostInterval[23],Triggers[24] -CDRST3,5,60m,,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,default,rif,rif,0;2,CDRST3_WARN_ASR +CDRST3,5,60m,,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,*default,rif,rif,0;2,CDRST3_WARN_ASR CDRST3,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACD CDRST3,,,,ACC,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACC -CDRST4,10,0,,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,CDRST4_WARN_ASR +CDRST4,10,0,,ASR,,,,,,,cgrates.org,call,,,,,,,,*default,,,,CDRST4_WARN_ASR CDRST4,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST4_WARN_ACD diff --git a/engine/cdrs.go b/engine/cdrs.go index dad06dd92..a8bfbd3c3 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -86,13 +86,13 @@ func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater, pubsub, use if stats == nil || reflect.ValueOf(stats).IsNil() { stats = nil } - return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, client: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: Guardian}, nil + return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rals: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: Guardian}, nil } type CdrServer struct { cgrCfg *config.CGRConfig cdrDb CdrStorage - client rpcclient.RpcClientConnection + rals rpcclient.RpcClientConnection pubsub rpcclient.RpcClientConnection users rpcclient.RpcClientConnection aliases rpcclient.RpcClientConnection @@ -183,23 +183,7 @@ func (self *CdrServer) RateCDRs(cdrFltr *utils.CDRsFilter, sendToStats bool) err return err } for _, cdr := range cdrs { - // replace user profile fields - if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil { - return err - } - // replace aliases for cases they were loaded after CDR received - if err := LoadAlias(&AttrMatchingAlias{ - Destination: cdr.Destination, - Direction: cdr.Direction, - Tenant: cdr.Tenant, - Category: cdr.Category, - Account: cdr.Account, - Subject: cdr.Subject, - Context: utils.ALIAS_CONTEXT_RATING, - }, cdr, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound { - return err - } - if err := self.rateStoreStatsReplicate(cdr, sendToStats); err != nil { + if err := self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, sendToStats, len(self.cgrCfg.CDRSCdrReplication) != 0); err != nil { utils.Logger.Err(fmt.Sprintf(" Processing CDR %+v, got error: %s", cdr, err.Error())) } } @@ -223,7 +207,7 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) { if cdr.Subject == "" { // Use account information as rating subject if missing cdr.Subject = cdr.Account } - if !cdr.Rated { + if !cdr.Rated { // Enforce the RunID if CDR is not rated cdr.RunID = utils.MetaRaw } if self.cgrCfg.CDRSStoreCdrs { // Store RawCDRs, this we do sync so we can reply with the status @@ -231,24 +215,92 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) { cdr.CostDetails.UpdateCost() cdr.CostDetails.UpdateRatedUsage() } - if err := self.cdrDb.SetCDR(cdr, false); err != nil { // Only original CDR stored in primary table, no derived + if err := self.cdrDb.SetCDR(cdr, false); err != nil { utils.Logger.Err(fmt.Sprintf(" Storing primary CDR %+v, got error: %s", cdr, err.Error())) return err // Error is propagated back and we don't continue processing the CDR if we cannot store it } } - go self.deriveRateStoreStatsReplicate(cdr) + // Attach raw CDR to stats + if self.stats != nil { // Send raw CDR to stats + var out int + go self.stats.Call("CDRStatsV1.AppendCDR", cdr, &out) + } + if len(self.cgrCfg.CDRSCdrReplication) != 0 { // Replicate raw CDR + go self.replicateCdr(cdr) + } + + if self.rals != nil && !cdr.Rated { // CDRs not rated will be processed by Rating + go self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, self.stats != nil, len(self.cgrCfg.CDRSCdrReplication) != 0) + } return nil } // Returns error if not able to properly store the CDR, mediation is async since we can always recover offline -func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR) error { +func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR, store, stats, replicate bool) error { cdrRuns, err := self.deriveCdrs(cdr) if err != nil { return err } + var ratedCDRs []*CDR // Gather all CDRs received from rating subsystem for _, cdrRun := range cdrRuns { - if err := self.rateStoreStatsReplicate(cdrRun, true); err != nil { - return err + utils.Logger.Debug(fmt.Sprintf("Processing CDR run: %+v", cdrRun)) + if err := LoadUserProfile(cdrRun, utils.EXTRA_FIELDS); err != nil { + utils.Logger.Err(fmt.Sprintf(" UserS handling for CDR %+v, got error: %s", cdrRun, err.Error())) + continue + } + if err := LoadAlias(&AttrMatchingAlias{ + Destination: cdrRun.Destination, + Direction: cdrRun.Direction, + Tenant: cdrRun.Tenant, + Category: cdrRun.Category, + Account: cdrRun.Account, + Subject: cdrRun.Subject, + Context: utils.ALIAS_CONTEXT_RATING, + }, cdrRun, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound { + utils.Logger.Err(fmt.Sprintf(" Aliasing CDR %+v, got error: %s", cdrRun, err.Error())) + continue + } + rcvRatedCDRs, err := self.rateCDR(cdrRun) + if err != nil { + cdrRun.Cost = -1.0 // If there was an error, mark the CDR + cdrRun.ExtraInfo = err.Error() + rcvRatedCDRs = []*CDR{cdrRun} + } + ratedCDRs = append(ratedCDRs, rcvRatedCDRs...) + } + // Request should be processed by SureTax + for _, ratedCDR := range ratedCDRs { + if ratedCDR.RunID == utils.META_SURETAX { + if err := SureTaxProcessCdr(ratedCDR); err != nil { + ratedCDR.Cost = -1.0 + ratedCDR.ExtraInfo = err.Error() // Something failed, write the error in the ExtraInfo + } + } + } + // Store rated CDRs + if store { + for _, ratedCDR := range ratedCDRs { + if ratedCDR.CostDetails != nil { + ratedCDR.CostDetails.UpdateCost() + ratedCDR.CostDetails.UpdateRatedUsage() + } + if err := self.cdrDb.SetCDR(ratedCDR, true); err != nil { + utils.Logger.Err(fmt.Sprintf(" Storing rated CDR %+v, got error: %s", ratedCDR, err.Error())) + } + } + } + // Attach CDR to stats + if stats { // Send CDR to stats + for _, ratedCDR := range ratedCDRs { + var out int + if err := self.stats.Call("CDRStatsV1.AppendCDR", ratedCDR, &out); err != nil { + utils.Logger.Err(fmt.Sprintf(" Could not send CDR to stats: %s", err.Error())) + } + } + } + if replicate { + for _, ratedCDR := range ratedCDRs { + self.replicateCdr(ratedCDR) } } return nil @@ -259,6 +311,7 @@ func (self *CdrServer) deriveCdrs(cdr *CDR) ([]*CDR, error) { if cdr.RunID != utils.MetaRaw { // Only derive *raw CDRs return cdrRuns, nil } + cdr.RunID = utils.META_DEFAULT // Rewrite *raw with *default since we have it as first run if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil { return nil, err } @@ -276,7 +329,7 @@ func (self *CdrServer) deriveCdrs(cdr *CDR) ([]*CDR, error) { attrsDC := &utils.AttrDerivedChargers{Tenant: cdr.Tenant, Category: cdr.Category, Direction: cdr.Direction, Account: cdr.Account, Subject: cdr.Subject, Destination: cdr.Destination} var dcs utils.DerivedChargers - if err := self.client.Call("Responder.GetDerivedChargers", attrsDC, &dcs); err != nil { + if err := self.rals.Call("Responder.GetDerivedChargers", attrsDC, &dcs); err != nil { utils.Logger.Err(fmt.Sprintf("Could not get derived charging for cgrid %s, error: %s", cdr.CGRID, err.Error())) return nil, err } @@ -321,73 +374,6 @@ func (self *CdrServer) deriveCdrs(cdr *CDR) ([]*CDR, error) { return cdrRuns, nil } -func (self *CdrServer) rateStoreStatsReplicate(cdr *CDR, sendToStats bool) error { - if cdr.RunID == utils.MetaRaw { // Overwrite *raw with *default for rating - cdr.RunID = utils.META_DEFAULT - } - if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil { - return err - } - if err := LoadAlias(&AttrMatchingAlias{ - Destination: cdr.Destination, - Direction: cdr.Direction, - Tenant: cdr.Tenant, - Category: cdr.Category, - Account: cdr.Account, - Subject: cdr.Subject, - Context: utils.ALIAS_CONTEXT_RATING, - }, cdr, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound { - return err - } - // Rate CDR, can receive multiple due to SMCosts for OriginIDPrefix - var ratedCDRs []*CDR - var err error - if cdr.Rated { - ratedCDRs = []*CDR{cdr} - } else if self.client != nil && !cdr.Rated { - if ratedCDRs, err = self.rateCDR(cdr); err != nil { - cdr.Cost = -1.0 // If there was an error, mark the CDR - cdr.ExtraInfo = err.Error() - ratedCDRs = []*CDR{cdr} - } - } - for _, ratedCDR := range ratedCDRs { - if ratedCDR.RunID == utils.META_SURETAX { // Request should be processed by SureTax - if err := SureTaxProcessCdr(ratedCDR); err != nil { - ratedCDR.Cost = -1.0 - ratedCDR.ExtraInfo = err.Error() // Something failed, write the error in the ExtraInfo - } - } - } - if self.cgrCfg.CDRSStoreCdrs { // Store CDRs - for _, ratedCDR := range ratedCDRs { - // Store RatedCDR - if ratedCDR.CostDetails != nil { - ratedCDR.CostDetails.UpdateCost() - ratedCDR.CostDetails.UpdateRatedUsage() - } - if err := self.cdrDb.SetCDR(ratedCDR, true); err != nil { - utils.Logger.Err(fmt.Sprintf(" Storing rated CDR %+v, got error: %s", ratedCDR, err.Error())) - } - } - } - // Attach CDR to stats - if self.stats != nil && sendToStats { // Send CDR to stats - for _, ratedCDR := range ratedCDRs { - var out int - if err := self.stats.Call("CDRStatsV1.AppendCDR", ratedCDR, &out); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not send CDR to stats: %s", err.Error())) - } - } - } - if len(self.cgrCfg.CDRSCdrReplication) != 0 { - for _, ratedCDR := range ratedCDRs { - self.replicateCdr(ratedCDR) - } - } - return nil -} - // rateCDR will populate cost field // Returns more than one rated CDR in case of SMCost retrieved based on prefix func (self *CdrServer) rateCDR(cdr *CDR) ([]*CDR, error) { @@ -462,9 +448,9 @@ func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) { PerformRounding: true, } if utils.IsSliceMember([]string{utils.META_PSEUDOPREPAID, utils.META_POSTPAID, utils.META_PREPAID, utils.PSEUDOPREPAID, utils.POSTPAID, utils.PREPAID}, cdr.RequestType) { // Prepaid - Cost can be recalculated in case of missing records from SM - err = self.client.Call("Responder.Debit", cd, cc) + err = self.rals.Call("Responder.Debit", cd, cc) } else { - err = self.client.Call("Responder.GetCost", cd, cc) + err = self.rals.Call("Responder.GetCost", cd, cc) } if err != nil { return cc, err diff --git a/engine/stats.go b/engine/stats.go index 571615ec6..b84fda549 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -289,6 +289,7 @@ func (s *Stats) setupQueueSaver(sq *StatsQueue) { func (s *Stats) AppendCDR(cdr *CDR, out *int) error { s.mux.RLock() defer s.mux.RUnlock() + utils.Logger.Debug(fmt.Sprintf("Stats.AppendCDR: %+v", cdr)) for _, sq := range s.queues { sq.AppendCDR(cdr) }