From b487bbed3680c3f123b2517d466bc205457a88ff Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 12 Jul 2015 21:48:33 +0200 Subject: [PATCH] Initial flatstore local test --- cdrc/cdrc_test.go | 119 +----------- cdrc/flatstore_local_test.go | 184 +++++++++++++++++++ config/config.go | 1 - config/config_defaults.go | 14 +- data/conf/samples/cdrcflatstore/cgrates.json | 59 ++++++ 5 files changed, 252 insertions(+), 125 deletions(-) create mode 100644 cdrc/flatstore_local_test.go create mode 100644 data/conf/samples/cdrcflatstore/cgrates.json diff --git a/cdrc/cdrc_test.go b/cdrc/cdrc_test.go index ea315fa2a..0277ecee9 100644 --- a/cdrc/cdrc_test.go +++ b/cdrc/cdrc_test.go @@ -125,123 +125,6 @@ func TestDataMultiplyFactor(t *testing.T) { } } -/* -func TestDnTdmCdrs(t *testing.T) { - tdmCdrs := ` -49773280254,0049LN130676000285,N_IP_0676_00-Internet 0676 WRAP 13,02.07.2014 15:24:40,02.07.2014 15:24:40,1,25,Peak,0.000000,49DE13 -49893252121,0049651515477,N_MO_MRAP_00-WRAP Mobile,02.07.2014 15:24:41,02.07.2014 15:24:41,1,8,Peak,0.003920,49651 -49497361022,0049LM0409005226,N_MO_MTMB_00-RW-Mobile,02.07.2014 15:24:41,02.07.2014 15:24:41,1,43,Peak,0.021050,49MTMB -` - cgrConfig, _ := config.NewDefaultCGRConfig() - eCdrs := []*engine.StoredCdr{ - &engine.StoredCdr{ - CgrId: utils.Sha1("49773280254", time.Date(2014, 7, 2, 15, 24, 40, 0, time.UTC).String()), - TOR: utils.VOICE, - AccId: "49773280254", - CdrHost: "0.0.0.0", - CdrSource: cgrConfig.CdrcSourceId, - ReqType: utils.META_RATED, - Direction: "*out", - Tenant: "cgrates.org", - Category: "call", - Account: "+49773280254", - Subject: "+49773280254", - Destination: "+49676000285", - SetupTime: time.Date(2014, 7, 2, 15, 24, 40, 0, time.UTC), - AnswerTime: time.Date(2014, 7, 2, 15, 24, 40, 0, time.UTC), - Usage: time.Duration(25) * time.Second, - Cost: -1, - }, - &engine.StoredCdr{ - CgrId: utils.Sha1("49893252121", time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC).String()), - TOR: utils.VOICE, - AccId: "49893252121", - CdrHost: "0.0.0.0", - CdrSource: cgrConfig.CdrcSourceId, - ReqType: utils.META_RATED, - Direction: "*out", - Tenant: "cgrates.org", - Category: "call", - Account: "+49893252121", - Subject: "+49893252121", - Destination: "+49651515477", - SetupTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC), - AnswerTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC), - Usage: time.Duration(8) * time.Second, - Cost: -1, - }, - &engine.StoredCdr{ - CgrId: utils.Sha1("49497361022", time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC).String()), - TOR: utils.VOICE, - AccId: "49497361022", - CdrHost: "0.0.0.0", - CdrSource: cgrConfig.CdrcSourceId, - ReqType: utils.META_RATED, - Direction: "*out", - Tenant: "cgrates.org", - Category: "call", - Account: "+49497361022", - Subject: "+49497361022", - Destination: "+499005226", - SetupTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC), - AnswerTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC), - Usage: time.Duration(43) * time.Second, - Cost: -1, - }, - } - torFld, _ := utils.NewRSRField("^*voice") - acntFld, _ := utils.NewRSRField(`~0:s/^([1-9]\d+)$/+$1/`) - reqTypeFld, _ := utils.NewRSRField("^rated") - dirFld, _ := utils.NewRSRField("^*out") - tenantFld, _ := utils.NewRSRField("^cgrates.org") - categFld, _ := utils.NewRSRField("^call") - dstFld, _ := utils.NewRSRField(`~1:s/^00(\d+)(?:[a-zA-Z].{3})*0*([1-9]\d+)$/+$1$2/`) - usageFld, _ := utils.NewRSRField(`~6:s/^(\d+)$/${1}s/`) - cgrConfig.CdrcCdrFields = map[string]*utils.RSRField{ - utils.TOR: torFld, - utils.ACCID: &utils.RSRField{Id: "0"}, - utils.REQTYPE: reqTypeFld, - utils.DIRECTION: dirFld, - utils.TENANT: tenantFld, - utils.CATEGORY: categFld, - utils.ACCOUNT: acntFld, - utils.SUBJECT: acntFld, - utils.DESTINATION: dstFld, - utils.SETUP_TIME: &utils.RSRField{Id: "4"}, - utils.ANSWER_TIME: &utils.RSRField{Id: "4"}, - utils.USAGE: usageFld, - } - cdrc := &Cdrc{cgrConfig.CdrcCdrs, cgrConfig.CdrcCdrFormat, cgrConfig.CdrcCdrInDir, cgrConfig.CdrcCdrOutDir, cgrConfig.CdrcSourceId, cgrConfig.CdrcRunDelay, ',', - cgrConfig.CdrcCdrFields, new(cdrs.CDRS), nil} - cdrsContent := bytes.NewReader([]byte(tdmCdrs)) - csvReader := csv.NewReader(cdrsContent) - cdrs := make([]*engine.StoredCdr, 0) - for { - cdrCsv, err := csvReader.Read() - if err != nil && err == io.EOF { - break // End of file - } else if err != nil { - t.Error("Unexpected error:", err) - } - if cdr, err := cdrc.recordToStoredCdr(cdrCsv); err != nil { - t.Error("Unexpected error: ", err) - } else { - cdrs = append(cdrs, cdr) - } - } - if !reflect.DeepEqual(eCdrs, cdrs) { - for _, ecdr := range eCdrs { - fmt.Printf("Cdr expected: %+v\n", ecdr) - } - for _, cdr := range cdrs { - fmt.Printf("Cdr processed: %+v\n", cdr) - } - t.Errorf("Expecting: %+v, received: %+v", eCdrs, cdrs) - } - -} -*/ - func TestNewPartialFlatstoreRecord(t *testing.T) { ePr := &PartialFlatstoreRecord{Method: "INVITE", AccId: "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:02daec40c548625ac", Timestamp: time.Date(2015, 7, 9, 17, 6, 48, 0, time.Local), Values: []string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454408", "*prepaid", "1001", "1002", "", "3401:2069362475"}} @@ -526,7 +409,7 @@ INVITE|324cb497|d4af7023|8deaadf2ae9a17809a391f05af31afb0@0:0:0:0:0:0:0:0|486|Bu &config.CfgCdrField{Tag: "Destination", Type: utils.CDRFIELD, CdrFieldId: utils.DESTINATION, Value: utils.ParseRSRFieldsMustCompile("9", utils.INFIELD_SEP), Mandatory: true}, &config.CfgCdrField{Tag: "SetupTime", Type: utils.CDRFIELD, CdrFieldId: utils.SETUP_TIME, Value: utils.ParseRSRFieldsMustCompile("6", utils.INFIELD_SEP), Mandatory: true}, &config.CfgCdrField{Tag: "AnswerTime", Type: utils.CDRFIELD, CdrFieldId: utils.ANSWER_TIME, Value: utils.ParseRSRFieldsMustCompile("6", utils.INFIELD_SEP), Mandatory: true}, - &config.CfgCdrField{Tag: "Duration", Type: utils.CDRFIELD, CdrFieldId: utils.USAGE, Mandatory: true}, + &config.CfgCdrField{Tag: "Usage", Type: utils.CDRFIELD, CdrFieldId: utils.USAGE, Mandatory: true}, &config.CfgCdrField{Tag: "DisconnectCause", Type: utils.CDRFIELD, CdrFieldId: utils.DISCONNECT_CAUSE, Value: utils.ParseRSRFieldsMustCompile("4;^ ;5", utils.INFIELD_SEP), Mandatory: true}, &config.CfgCdrField{Tag: "DialogId", Type: utils.CDRFIELD, CdrFieldId: "DialogIdentifier", Value: utils.ParseRSRFieldsMustCompile("11", utils.INFIELD_SEP)}, }} diff --git a/cdrc/flatstore_local_test.go b/cdrc/flatstore_local_test.go new file mode 100644 index 000000000..31c136a33 --- /dev/null +++ b/cdrc/flatstore_local_test.go @@ -0,0 +1,184 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2012-2015 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package cdrc + +import ( + "io/ioutil" + "net/rpc" + "net/rpc/jsonrpc" + "os" + "path" + "testing" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" +) + +var flatstoreCfgPath string +var flatstoreCfg *config.CGRConfig +var flatstoreRpc *rpc.Client +var flatstoreCdrcCfg *config.CdrcConfig + +var fullSuccessfull = `INVITE|2daec40c|548625ac|dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0|200|OK|1436454408|*prepaid|1001|1002||3401:2069362475 +BYE|2daec40c|548625ac|dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0|200|OK|1436454410|||||3401:2069362475 +INVITE|f9d3d5c3|c863a6e3|214d8f52b566e33a9349b184e72a4cca@0:0:0:0:0:0:0:0|200|OK|1436454647|*postpaid|1002|1001||1877:893549741 +BYE|f9d3d5c3|c863a6e3|214d8f52b566e33a9349b184e72a4cca@0:0:0:0:0:0:0:0|200|OK|1436454651|||||1877:893549741 +INVITE|36e39a5|42d996f9|3a63321dd3b325eec688dc2aefb6ac2d@0:0:0:0:0:0:0:0|200|OK|1436454657|*prepaid|1001|1002||2407:1884881533 +BYE|36e39a5|42d996f9|3a63321dd3b325eec688dc2aefb6ac2d@0:0:0:0:0:0:0:0|200|OK|1436454661|||||2407:1884881533 +INVITE|3111f3c9|49ca4c42|a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:0|200|OK|1436454690|*prepaid|1001|1002||3099:1909036290 +BYE|3111f3c9|49ca4c42|a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:0|200|OK|1436454692|||||3099:1909036290 +` + +var fullMissed = `INVITE|ef6c6256|da501581|0bfdd176d1b93e7df3de5c6f4873ee04@0:0:0:0:0:0:0:0|487|Request Terminated|1436454643|*prepaid|1001|1002||1224:339382783 +INVITE|7905e511||81880da80a94bda81b425b09009e055c@0:0:0:0:0:0:0:0|404|Not Found|1436454668|*prepaid|1001|1002||1980:1216490844 +INVITE|324cb497|d4af7023|8deaadf2ae9a17809a391f05af31afb0@0:0:0:0:0:0:0:0|486|Busy here|1436454687|*postpaid|1002|1001||474:130115066` + +var part1 = `BYE|f9d3d5c3|c863a6e3|214d8f52b566e33a9349b184e72a4ccb@0:0:0:0:0:0:0:0|200|OK|1436454651|||||1877:893549742 +` + +var part2 = `INVITE|f9d3d5c3|c863a6e3|214d8f52b566e33a9349b184e72a4ccb@0:0:0:0:0:0:0:0|200|OK|1436454647|*postpaid|1002|1003||1877:893549742 +INVITE|2daec40c|548625ac|dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0|200|OK|1436454408|*prepaid|1001|1002||3401:2069362475` + +func TestFlatstoreLclInitCfg(t *testing.T) { + if !*testLocal { + return + } + var err error + flatstoreCfgPath = path.Join(*dataDir, "conf", "samples", "cdrcflatstore") + if flatstoreCfg, err = config.NewCGRConfigFromFolder(flatstoreCfgPath); err != nil { + t.Fatal("Got config error: ", err.Error()) + } +} + +// InitDb so we can rely on count +func TestFlatstoreLclInitCdrDb(t *testing.T) { + if !*testLocal { + return + } + if err := engine.InitStorDb(flatstoreCfg); err != nil { + t.Fatal(err) + } +} + +// Creates cdr files and moves them into processing folder +func TestFlatstoreLclCreateCdrFiles(t *testing.T) { + if !*testLocal { + return + } + if flatstoreCfg == nil { + t.Fatal("Empty default cdrc configuration") + } + flatstoreCdrcCfg = flatstoreCfg.CdrcProfiles["/tmp/cgr_flatstore/cdrc/in"]["FLATSTORE"] + if err := os.RemoveAll(flatstoreCdrcCfg.CdrInDir); err != nil { + t.Fatal("Error removing folder: ", flatstoreCdrcCfg.CdrInDir, err) + } + if err := os.MkdirAll(flatstoreCdrcCfg.CdrInDir, 0755); err != nil { + t.Fatal("Error creating folder: ", flatstoreCdrcCfg.CdrInDir, err) + } + if err := os.RemoveAll(flatstoreCdrcCfg.CdrOutDir); err != nil { + t.Fatal("Error removing folder: ", flatstoreCdrcCfg.CdrOutDir, err) + } + if err := os.MkdirAll(flatstoreCdrcCfg.CdrOutDir, 0755); err != nil { + t.Fatal("Error creating folder: ", flatstoreCdrcCfg.CdrOutDir, err) + } +} + +func TestFlatstoreLclStartEngine(t *testing.T) { + if !*testLocal { + return + } + if _, err := engine.StopStartEngine(flatstoreCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func TestFlatstoreLclRpcConn(t *testing.T) { + if !*testLocal { + return + } + var err error + flatstoreRpc, err = jsonrpc.Dial("tcp", flatstoreCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +func TestFlatstoreLclProcessFiles(t *testing.T) { + if !*testLocal { + return + } + if err := ioutil.WriteFile(path.Join("/tmp", "acc_1.log"), []byte(fullSuccessfull), 0644); err != nil { + t.Fatal(err.Error) + } + if err := ioutil.WriteFile(path.Join("/tmp", "missed_calls_1.log"), []byte(fullMissed), 0644); err != nil { + t.Fatal(err.Error) + } + if err := ioutil.WriteFile(path.Join("/tmp", "acc_2.log"), []byte(part1), 0644); err != nil { + t.Fatal(err.Error) + } + if err := ioutil.WriteFile(path.Join("/tmp", "acc_3.log"), []byte(part2), 0644); err != nil { + t.Fatal(err.Error) + } + //Rename(oldpath, newpath string) + for _, fileName := range []string{"acc_1.log", "missed_calls_1.log", "acc_2.log", "acc_3.log"} { + if err := os.Rename(path.Join("/tmp", fileName), path.Join(flatstoreCdrcCfg.CdrInDir, fileName)); err != nil { + t.Fatal(err) + } + } +} + +/* + +// Creates cdr files and starts the engine +func TestCreateCdr3File(t *testing.T) { + if !*testLocal { + return + } + if err := os.RemoveAll(cdrcCfg.CdrInDir); err != nil { + t.Fatal("Error removing folder: ", cdrcCfg.CdrInDir, err) + } + if err := os.MkdirAll(cdrcCfg.CdrInDir, 0755); err != nil { + t.Fatal("Error creating folder: ", cdrcCfg.CdrInDir, err) + } + if err := ioutil.WriteFile(path.Join(cdrcCfg.CdrInDir, "file3.csv"), []byte(fileContent3), 0644); err != nil { + t.Fatal(err.Error) + } +} + +func TestProcessCdr3Dir(t *testing.T) { + if !*testLocal { + return + } + if cdrcCfg.Cdrs == utils.INTERNAL { // For now we only test over network + cdrcCfg.Cdrs = "127.0.0.1:2013" + } + if err := startEngine(); err != nil { + t.Fatal(err.Error()) + } + cdrc, err := NewCdrc(cdrcCfgs, true, nil, make(chan struct{})) + if err != nil { + t.Fatal(err.Error()) + } + if err := cdrc.processCdrDir(); err != nil { + t.Error(err) + } + stopEngine() +} +*/ diff --git a/config/config.go b/config/config.go index 31b6f90b1..1d7dac91c 100644 --- a/config/config.go +++ b/config/config.go @@ -210,7 +210,6 @@ type CGRConfig struct { CDRStatsSaveInterval time.Duration // Save interval duration PubSubEnabled bool HistoryEnabled bool - //CDRStatConfig *CdrStatsConfig // Active cdr stats configuration instances, platform level CdreProfiles map[string]*CdreConfig CdrcProfiles map[string]map[string]*CdrcConfig // Number of CDRC instances running imports, format map[dirPath]map[instanceName]{Configs} SmFsConfig *SmFsConfig // SM-FreeSWITCH configuration diff --git a/config/config_defaults.go b/config/config_defaults.go index 720554d0e..e199866c4 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -88,10 +88,10 @@ const CGRATES_CFG_JSON = ` "rater": { "enabled": false, // enable Rater service: "balancer": "", // register to balancer as worker: <""|internal|x.y.z.y:1234> - "cdrstats": "", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234> - "historys": "", // address where to reach the history service, empty to disable history functionality<""|internal|x.y.z.y:1234> - "pubusubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality<""|internal|x.y.z.y:1234> - "users": "", // address where to reach the user service, empty to disable user profile functionality<""|internal|x.y.z.y:1234> + "cdrstats": "", // address where to reach the cdrstats service, empty to disable stats functionality: <""|internal|x.y.z.y:1234> + "historys": "", // address where to reach the history service, empty to disable history functionality: <""|internal|x.y.z.y:1234> + "pubusubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234> + "users": "", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234> }, @@ -247,14 +247,16 @@ const CGRATES_CFG_JSON = ` "pubsubs": { - "enabled": false, // starts History service: . + "enabled": false, // starts PubSub service: . }, + "users": { - "enabled": false, // starts Users service: . + "enabled": false, // starts User service: . "indexes": [], // user profile field indexes }, + "mailer": { "server": "localhost", // the server to use when sending emails out "auth_user": "cgrates", // authenticate to email server using this user diff --git a/data/conf/samples/cdrcflatstore/cgrates.json b/data/conf/samples/cdrcflatstore/cgrates.json new file mode 100644 index 000000000..d9b6754a3 --- /dev/null +++ b/data/conf/samples/cdrcflatstore/cgrates.json @@ -0,0 +1,59 @@ +{ + +// Real-time Charging System for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +// +// This file contains the default configuration hardcoded into CGRateS. +// This is what you get when you load CGRateS with an empty configuration file. + + +"rater": { + "enabled": true, // enable Rater service: +}, + + +"scheduler": { + "enabled": true, // start Scheduler service: +}, + + +"cdrs": { + "enabled": true, // start the CDR Server service: +}, + + +"cdrc": { + "FLATSTORE": { + "enabled": true, // enable CDR client functionality + "cdrs": "internal", // address where to reach CDR server. + "cdr_format": "csv", // CDR file format + "field_separator": "|", // separator used in case of csv files + "run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify + "max_open_files": 1024, // maximum simultaneous files to process + "data_usage_multiply_factor": 1024, // conversion factor for data usage + "cdr_in_dir": "/tmp/cgr_flatstore/cdrc/in", // absolute path towards the directory where the CDRs are stored + "cdr_out_dir": "/tmp/cgr_flatstore/cdrc/out", // absolute path towards the directory where processed CDRs will be moved + "failed_calls_prefix": "missed_calls", // used in case of flatstore CDRs to avoid searching for BYE records + "cdr_source_id": "flatstore", // free form field, tag identifying the source of the CDRs within CDRS database + "cdr_filter": "", // filter CDR records to import + "partial_record_cache": "1s", // duration to cache partial records when not pairing + "cdr_fields":[ // import template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + {"tag": "Tor", "cdr_field_id": "tor", "type": "cdrfield", "value": "^*voice", "mandatory": true}, + {"tag": "AccId", "cdr_field_id": "accid", "type": "cdrfield", "mandatory": true}, + {"tag": "ReqType", "cdr_field_id": "reqtype", "type": "cdrfield", "value": "7", "mandatory": true}, + {"tag": "Direction", "cdr_field_id": "direction", "type": "cdrfield", "value": "^out", "mandatory": true}, + {"tag": "Tenant", "cdr_field_id": "tenant", "type": "cdrfield", "value": "^cgrates.org", "mandatory": true}, + {"tag": "Category", "cdr_field_id": "category", "type": "cdrfield", "value": "^call", "mandatory": true}, + {"tag": "Account", "cdr_field_id": "account", "type": "cdrfield", "value": "8", "mandatory": true}, + {"tag": "Subject", "cdr_field_id": "subject", "type": "cdrfield", "value": "8", "mandatory": true}, + {"tag": "Destination", "cdr_field_id": "destination", "type": "cdrfield", "value": "9", "mandatory": true}, + {"tag": "SetupTime", "cdr_field_id": "setup_time", "type": "cdrfield", "value": "6", "mandatory": true}, + {"tag": "AnswerTime", "cdr_field_id": "answer_time", "type": "cdrfield", "value": "6", "mandatory": true}, + {"tag": "Usage", "cdr_field_id": "usage", "type": "cdrfield", "mandatory": true}, + {"tag": "DisconnectCause", "cdr_field_id": "disconnect_cause", "type": "cdrfield", "value": "4;^ ;5", "mandatory": true}, + {"tag": "DialogId", "cdr_field_id": "DialogId", "type": "cdrfield", "value": "11"}, + ], + }, +}, + +}