Initial flatstore local test

This commit is contained in:
DanB
2015-07-12 21:48:33 +02:00
parent 61344c1dbf
commit b487bbed36
5 changed files with 252 additions and 125 deletions

View File

@@ -125,123 +125,6 @@ func TestDataMultiplyFactor(t *testing.T) {
}
}
/*
func TestDnTdmCdrs(t *testing.T) {
tdmCdrs := `
49773280254,0049LN130676000285,N_IP_0676_00-Internet 0676 WRAP 13,02.07.2014 15:24:40,02.07.2014 15:24:40,1,25,Peak,0.000000,49DE13
49893252121,0049651515477,N_MO_MRAP_00-WRAP Mobile,02.07.2014 15:24:41,02.07.2014 15:24:41,1,8,Peak,0.003920,49651
49497361022,0049LM0409005226,N_MO_MTMB_00-RW-Mobile,02.07.2014 15:24:41,02.07.2014 15:24:41,1,43,Peak,0.021050,49MTMB
`
cgrConfig, _ := config.NewDefaultCGRConfig()
eCdrs := []*engine.StoredCdr{
&engine.StoredCdr{
CgrId: utils.Sha1("49773280254", time.Date(2014, 7, 2, 15, 24, 40, 0, time.UTC).String()),
TOR: utils.VOICE,
AccId: "49773280254",
CdrHost: "0.0.0.0",
CdrSource: cgrConfig.CdrcSourceId,
ReqType: utils.META_RATED,
Direction: "*out",
Tenant: "cgrates.org",
Category: "call",
Account: "+49773280254",
Subject: "+49773280254",
Destination: "+49676000285",
SetupTime: time.Date(2014, 7, 2, 15, 24, 40, 0, time.UTC),
AnswerTime: time.Date(2014, 7, 2, 15, 24, 40, 0, time.UTC),
Usage: time.Duration(25) * time.Second,
Cost: -1,
},
&engine.StoredCdr{
CgrId: utils.Sha1("49893252121", time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC).String()),
TOR: utils.VOICE,
AccId: "49893252121",
CdrHost: "0.0.0.0",
CdrSource: cgrConfig.CdrcSourceId,
ReqType: utils.META_RATED,
Direction: "*out",
Tenant: "cgrates.org",
Category: "call",
Account: "+49893252121",
Subject: "+49893252121",
Destination: "+49651515477",
SetupTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC),
AnswerTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC),
Usage: time.Duration(8) * time.Second,
Cost: -1,
},
&engine.StoredCdr{
CgrId: utils.Sha1("49497361022", time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC).String()),
TOR: utils.VOICE,
AccId: "49497361022",
CdrHost: "0.0.0.0",
CdrSource: cgrConfig.CdrcSourceId,
ReqType: utils.META_RATED,
Direction: "*out",
Tenant: "cgrates.org",
Category: "call",
Account: "+49497361022",
Subject: "+49497361022",
Destination: "+499005226",
SetupTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC),
AnswerTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC),
Usage: time.Duration(43) * time.Second,
Cost: -1,
},
}
torFld, _ := utils.NewRSRField("^*voice")
acntFld, _ := utils.NewRSRField(`~0:s/^([1-9]\d+)$/+$1/`)
reqTypeFld, _ := utils.NewRSRField("^rated")
dirFld, _ := utils.NewRSRField("^*out")
tenantFld, _ := utils.NewRSRField("^cgrates.org")
categFld, _ := utils.NewRSRField("^call")
dstFld, _ := utils.NewRSRField(`~1:s/^00(\d+)(?:[a-zA-Z].{3})*0*([1-9]\d+)$/+$1$2/`)
usageFld, _ := utils.NewRSRField(`~6:s/^(\d+)$/${1}s/`)
cgrConfig.CdrcCdrFields = map[string]*utils.RSRField{
utils.TOR: torFld,
utils.ACCID: &utils.RSRField{Id: "0"},
utils.REQTYPE: reqTypeFld,
utils.DIRECTION: dirFld,
utils.TENANT: tenantFld,
utils.CATEGORY: categFld,
utils.ACCOUNT: acntFld,
utils.SUBJECT: acntFld,
utils.DESTINATION: dstFld,
utils.SETUP_TIME: &utils.RSRField{Id: "4"},
utils.ANSWER_TIME: &utils.RSRField{Id: "4"},
utils.USAGE: usageFld,
}
cdrc := &Cdrc{cgrConfig.CdrcCdrs, cgrConfig.CdrcCdrFormat, cgrConfig.CdrcCdrInDir, cgrConfig.CdrcCdrOutDir, cgrConfig.CdrcSourceId, cgrConfig.CdrcRunDelay, ',',
cgrConfig.CdrcCdrFields, new(cdrs.CDRS), nil}
cdrsContent := bytes.NewReader([]byte(tdmCdrs))
csvReader := csv.NewReader(cdrsContent)
cdrs := make([]*engine.StoredCdr, 0)
for {
cdrCsv, err := csvReader.Read()
if err != nil && err == io.EOF {
break // End of file
} else if err != nil {
t.Error("Unexpected error:", err)
}
if cdr, err := cdrc.recordToStoredCdr(cdrCsv); err != nil {
t.Error("Unexpected error: ", err)
} else {
cdrs = append(cdrs, cdr)
}
}
if !reflect.DeepEqual(eCdrs, cdrs) {
for _, ecdr := range eCdrs {
fmt.Printf("Cdr expected: %+v\n", ecdr)
}
for _, cdr := range cdrs {
fmt.Printf("Cdr processed: %+v\n", cdr)
}
t.Errorf("Expecting: %+v, received: %+v", eCdrs, cdrs)
}
}
*/
func TestNewPartialFlatstoreRecord(t *testing.T) {
ePr := &PartialFlatstoreRecord{Method: "INVITE", AccId: "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:02daec40c548625ac", Timestamp: time.Date(2015, 7, 9, 17, 6, 48, 0, time.Local),
Values: []string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454408", "*prepaid", "1001", "1002", "", "3401:2069362475"}}
@@ -526,7 +409,7 @@ INVITE|324cb497|d4af7023|8deaadf2ae9a17809a391f05af31afb0@0:0:0:0:0:0:0:0|486|Bu
&config.CfgCdrField{Tag: "Destination", Type: utils.CDRFIELD, CdrFieldId: utils.DESTINATION, Value: utils.ParseRSRFieldsMustCompile("9", utils.INFIELD_SEP), Mandatory: true},
&config.CfgCdrField{Tag: "SetupTime", Type: utils.CDRFIELD, CdrFieldId: utils.SETUP_TIME, Value: utils.ParseRSRFieldsMustCompile("6", utils.INFIELD_SEP), Mandatory: true},
&config.CfgCdrField{Tag: "AnswerTime", Type: utils.CDRFIELD, CdrFieldId: utils.ANSWER_TIME, Value: utils.ParseRSRFieldsMustCompile("6", utils.INFIELD_SEP), Mandatory: true},
&config.CfgCdrField{Tag: "Duration", Type: utils.CDRFIELD, CdrFieldId: utils.USAGE, Mandatory: true},
&config.CfgCdrField{Tag: "Usage", Type: utils.CDRFIELD, CdrFieldId: utils.USAGE, Mandatory: true},
&config.CfgCdrField{Tag: "DisconnectCause", Type: utils.CDRFIELD, CdrFieldId: utils.DISCONNECT_CAUSE, Value: utils.ParseRSRFieldsMustCompile("4;^ ;5", utils.INFIELD_SEP), Mandatory: true},
&config.CfgCdrField{Tag: "DialogId", Type: utils.CDRFIELD, CdrFieldId: "DialogIdentifier", Value: utils.ParseRSRFieldsMustCompile("11", utils.INFIELD_SEP)},
}}

View File

@@ -0,0 +1,184 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2012-2015 ITsysCOM
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package cdrc
import (
"io/ioutil"
"net/rpc"
"net/rpc/jsonrpc"
"os"
"path"
"testing"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
)
var flatstoreCfgPath string
var flatstoreCfg *config.CGRConfig
var flatstoreRpc *rpc.Client
var flatstoreCdrcCfg *config.CdrcConfig
var fullSuccessfull = `INVITE|2daec40c|548625ac|dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0|200|OK|1436454408|*prepaid|1001|1002||3401:2069362475
BYE|2daec40c|548625ac|dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0|200|OK|1436454410|||||3401:2069362475
INVITE|f9d3d5c3|c863a6e3|214d8f52b566e33a9349b184e72a4cca@0:0:0:0:0:0:0:0|200|OK|1436454647|*postpaid|1002|1001||1877:893549741
BYE|f9d3d5c3|c863a6e3|214d8f52b566e33a9349b184e72a4cca@0:0:0:0:0:0:0:0|200|OK|1436454651|||||1877:893549741
INVITE|36e39a5|42d996f9|3a63321dd3b325eec688dc2aefb6ac2d@0:0:0:0:0:0:0:0|200|OK|1436454657|*prepaid|1001|1002||2407:1884881533
BYE|36e39a5|42d996f9|3a63321dd3b325eec688dc2aefb6ac2d@0:0:0:0:0:0:0:0|200|OK|1436454661|||||2407:1884881533
INVITE|3111f3c9|49ca4c42|a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:0|200|OK|1436454690|*prepaid|1001|1002||3099:1909036290
BYE|3111f3c9|49ca4c42|a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:0|200|OK|1436454692|||||3099:1909036290
`
var fullMissed = `INVITE|ef6c6256|da501581|0bfdd176d1b93e7df3de5c6f4873ee04@0:0:0:0:0:0:0:0|487|Request Terminated|1436454643|*prepaid|1001|1002||1224:339382783
INVITE|7905e511||81880da80a94bda81b425b09009e055c@0:0:0:0:0:0:0:0|404|Not Found|1436454668|*prepaid|1001|1002||1980:1216490844
INVITE|324cb497|d4af7023|8deaadf2ae9a17809a391f05af31afb0@0:0:0:0:0:0:0:0|486|Busy here|1436454687|*postpaid|1002|1001||474:130115066`
var part1 = `BYE|f9d3d5c3|c863a6e3|214d8f52b566e33a9349b184e72a4ccb@0:0:0:0:0:0:0:0|200|OK|1436454651|||||1877:893549742
`
var part2 = `INVITE|f9d3d5c3|c863a6e3|214d8f52b566e33a9349b184e72a4ccb@0:0:0:0:0:0:0:0|200|OK|1436454647|*postpaid|1002|1003||1877:893549742
INVITE|2daec40c|548625ac|dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0|200|OK|1436454408|*prepaid|1001|1002||3401:2069362475`
func TestFlatstoreLclInitCfg(t *testing.T) {
if !*testLocal {
return
}
var err error
flatstoreCfgPath = path.Join(*dataDir, "conf", "samples", "cdrcflatstore")
if flatstoreCfg, err = config.NewCGRConfigFromFolder(flatstoreCfgPath); err != nil {
t.Fatal("Got config error: ", err.Error())
}
}
// InitDb so we can rely on count
func TestFlatstoreLclInitCdrDb(t *testing.T) {
if !*testLocal {
return
}
if err := engine.InitStorDb(flatstoreCfg); err != nil {
t.Fatal(err)
}
}
// Creates cdr files and moves them into processing folder
func TestFlatstoreLclCreateCdrFiles(t *testing.T) {
if !*testLocal {
return
}
if flatstoreCfg == nil {
t.Fatal("Empty default cdrc configuration")
}
flatstoreCdrcCfg = flatstoreCfg.CdrcProfiles["/tmp/cgr_flatstore/cdrc/in"]["FLATSTORE"]
if err := os.RemoveAll(flatstoreCdrcCfg.CdrInDir); err != nil {
t.Fatal("Error removing folder: ", flatstoreCdrcCfg.CdrInDir, err)
}
if err := os.MkdirAll(flatstoreCdrcCfg.CdrInDir, 0755); err != nil {
t.Fatal("Error creating folder: ", flatstoreCdrcCfg.CdrInDir, err)
}
if err := os.RemoveAll(flatstoreCdrcCfg.CdrOutDir); err != nil {
t.Fatal("Error removing folder: ", flatstoreCdrcCfg.CdrOutDir, err)
}
if err := os.MkdirAll(flatstoreCdrcCfg.CdrOutDir, 0755); err != nil {
t.Fatal("Error creating folder: ", flatstoreCdrcCfg.CdrOutDir, err)
}
}
func TestFlatstoreLclStartEngine(t *testing.T) {
if !*testLocal {
return
}
if _, err := engine.StopStartEngine(flatstoreCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
// Connect rpc client to rater
func TestFlatstoreLclRpcConn(t *testing.T) {
if !*testLocal {
return
}
var err error
flatstoreRpc, err = jsonrpc.Dial("tcp", flatstoreCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
}
func TestFlatstoreLclProcessFiles(t *testing.T) {
if !*testLocal {
return
}
if err := ioutil.WriteFile(path.Join("/tmp", "acc_1.log"), []byte(fullSuccessfull), 0644); err != nil {
t.Fatal(err.Error)
}
if err := ioutil.WriteFile(path.Join("/tmp", "missed_calls_1.log"), []byte(fullMissed), 0644); err != nil {
t.Fatal(err.Error)
}
if err := ioutil.WriteFile(path.Join("/tmp", "acc_2.log"), []byte(part1), 0644); err != nil {
t.Fatal(err.Error)
}
if err := ioutil.WriteFile(path.Join("/tmp", "acc_3.log"), []byte(part2), 0644); err != nil {
t.Fatal(err.Error)
}
//Rename(oldpath, newpath string)
for _, fileName := range []string{"acc_1.log", "missed_calls_1.log", "acc_2.log", "acc_3.log"} {
if err := os.Rename(path.Join("/tmp", fileName), path.Join(flatstoreCdrcCfg.CdrInDir, fileName)); err != nil {
t.Fatal(err)
}
}
}
/*
// Creates cdr files and starts the engine
func TestCreateCdr3File(t *testing.T) {
if !*testLocal {
return
}
if err := os.RemoveAll(cdrcCfg.CdrInDir); err != nil {
t.Fatal("Error removing folder: ", cdrcCfg.CdrInDir, err)
}
if err := os.MkdirAll(cdrcCfg.CdrInDir, 0755); err != nil {
t.Fatal("Error creating folder: ", cdrcCfg.CdrInDir, err)
}
if err := ioutil.WriteFile(path.Join(cdrcCfg.CdrInDir, "file3.csv"), []byte(fileContent3), 0644); err != nil {
t.Fatal(err.Error)
}
}
func TestProcessCdr3Dir(t *testing.T) {
if !*testLocal {
return
}
if cdrcCfg.Cdrs == utils.INTERNAL { // For now we only test over network
cdrcCfg.Cdrs = "127.0.0.1:2013"
}
if err := startEngine(); err != nil {
t.Fatal(err.Error())
}
cdrc, err := NewCdrc(cdrcCfgs, true, nil, make(chan struct{}))
if err != nil {
t.Fatal(err.Error())
}
if err := cdrc.processCdrDir(); err != nil {
t.Error(err)
}
stopEngine()
}
*/

View File

@@ -210,7 +210,6 @@ type CGRConfig struct {
CDRStatsSaveInterval time.Duration // Save interval duration
PubSubEnabled bool
HistoryEnabled bool
//CDRStatConfig *CdrStatsConfig // Active cdr stats configuration instances, platform level
CdreProfiles map[string]*CdreConfig
CdrcProfiles map[string]map[string]*CdrcConfig // Number of CDRC instances running imports, format map[dirPath]map[instanceName]{Configs}
SmFsConfig *SmFsConfig // SM-FreeSWITCH configuration

View File

@@ -88,10 +88,10 @@ const CGRATES_CFG_JSON = `
"rater": {
"enabled": false, // enable Rater service: <true|false>
"balancer": "", // register to balancer as worker: <""|internal|x.y.z.y:1234>
"cdrstats": "", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234>
"historys": "", // address where to reach the history service, empty to disable history functionality<""|internal|x.y.z.y:1234>
"pubusubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality<""|internal|x.y.z.y:1234>
"users": "", // address where to reach the user service, empty to disable user profile functionality<""|internal|x.y.z.y:1234>
"cdrstats": "", // address where to reach the cdrstats service, empty to disable stats functionality: <""|internal|x.y.z.y:1234>
"historys": "", // address where to reach the history service, empty to disable history functionality: <""|internal|x.y.z.y:1234>
"pubusubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234>
"users": "", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234>
},
@@ -247,14 +247,16 @@ const CGRATES_CFG_JSON = `
"pubsubs": {
"enabled": false, // starts History service: <true|false>.
"enabled": false, // starts PubSub service: <true|false>.
},
"users": {
"enabled": false, // starts Users service: <true|false>.
"enabled": false, // starts User service: <true|false>.
"indexes": [], // user profile field indexes
},
"mailer": {
"server": "localhost", // the server to use when sending emails out
"auth_user": "cgrates", // authenticate to email server using this user

View File

@@ -0,0 +1,59 @@
{
// Real-time Charging System for Telecom & ISP environments
// Copyright (C) ITsysCOM GmbH
//
// This file contains the default configuration hardcoded into CGRateS.
// This is what you get when you load CGRateS with an empty configuration file.
"rater": {
"enabled": true, // enable Rater service: <true|false>
},
"scheduler": {
"enabled": true, // start Scheduler service: <true|false>
},
"cdrs": {
"enabled": true, // start the CDR Server service: <true|false>
},
"cdrc": {
"FLATSTORE": {
"enabled": true, // enable CDR client functionality
"cdrs": "internal", // address where to reach CDR server. <internal|x.y.z.y:1234>
"cdr_format": "csv", // CDR file format <csv|freeswitch_csv|fwv|opensips_flatstore>
"field_separator": "|", // separator used in case of csv files
"run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify
"max_open_files": 1024, // maximum simultaneous files to process
"data_usage_multiply_factor": 1024, // conversion factor for data usage
"cdr_in_dir": "/tmp/cgr_flatstore/cdrc/in", // absolute path towards the directory where the CDRs are stored
"cdr_out_dir": "/tmp/cgr_flatstore/cdrc/out", // absolute path towards the directory where processed CDRs will be moved
"failed_calls_prefix": "missed_calls", // used in case of flatstore CDRs to avoid searching for BYE records
"cdr_source_id": "flatstore", // free form field, tag identifying the source of the CDRs within CDRS database
"cdr_filter": "", // filter CDR records to import
"partial_record_cache": "1s", // duration to cache partial records when not pairing
"cdr_fields":[ // import template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "Tor", "cdr_field_id": "tor", "type": "cdrfield", "value": "^*voice", "mandatory": true},
{"tag": "AccId", "cdr_field_id": "accid", "type": "cdrfield", "mandatory": true},
{"tag": "ReqType", "cdr_field_id": "reqtype", "type": "cdrfield", "value": "7", "mandatory": true},
{"tag": "Direction", "cdr_field_id": "direction", "type": "cdrfield", "value": "^out", "mandatory": true},
{"tag": "Tenant", "cdr_field_id": "tenant", "type": "cdrfield", "value": "^cgrates.org", "mandatory": true},
{"tag": "Category", "cdr_field_id": "category", "type": "cdrfield", "value": "^call", "mandatory": true},
{"tag": "Account", "cdr_field_id": "account", "type": "cdrfield", "value": "8", "mandatory": true},
{"tag": "Subject", "cdr_field_id": "subject", "type": "cdrfield", "value": "8", "mandatory": true},
{"tag": "Destination", "cdr_field_id": "destination", "type": "cdrfield", "value": "9", "mandatory": true},
{"tag": "SetupTime", "cdr_field_id": "setup_time", "type": "cdrfield", "value": "6", "mandatory": true},
{"tag": "AnswerTime", "cdr_field_id": "answer_time", "type": "cdrfield", "value": "6", "mandatory": true},
{"tag": "Usage", "cdr_field_id": "usage", "type": "cdrfield", "mandatory": true},
{"tag": "DisconnectCause", "cdr_field_id": "disconnect_cause", "type": "cdrfield", "value": "4;^ ;5", "mandatory": true},
{"tag": "DialogId", "cdr_field_id": "DialogId", "type": "cdrfield", "value": "11"},
],
},
},
}