diff --git a/data/conf/samples/cdrsonexpamqp/aws_amqp.json b/data/conf/samples/cdrsonexpamqp/aws_amqp.json new file mode 100644 index 000000000..769f29ebf --- /dev/null +++ b/data/conf/samples/cdrsonexpamqp/aws_amqp.json @@ -0,0 +1,63 @@ +{ +// CGRateS Configuration file +// +// Used in apier_local_tests +// Starts rater, cdrs and mediator connecting over internal channel + +"general": { + "log_level": 7, + "poster_attempts": 1, +}, + + +"stor_db": { // database used to store offline tariff plans and CDRs + "db_password": "CGRateS.org", // password to use when connecting to stordb +}, + + +"rals": { + "enabled": true, // enable Rater service: +}, + +"cdrs": { + "enabled": true, // start the CDR Server service: + "store_cdrs": false, // store cdrs in storDb + "online_cdr_exports": [ "amqp_test_file"/*,"amqp_localhost"*/], +}, + + +"cdre": { + "amqp_localhost": { + "export_format": "*aws_json_map", + // "export_path": "amqps://cgrates:password@host:5671/?queue_id=cgrates_cdrs", + "export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs", + "attempts": 3, + "content_fields": [ // template of the exported content fields + {"tag": "CGRID", "type": "*composed", "value": "~CGRID", "field_id": "CGRID"}, + {"tag":"RunID", "type": "*composed", "value": "~RunID", "field_id": "RunID"}, + {"tag":"TOR", "type": "*composed", "value": "~ToR", "field_id": "ToR"}, + {"tag":"OriginID", "type": "*composed", "value": "~OriginID", "field_id": "OriginID"}, + {"tag":"OriginHost", "type": "*composed", "value": "~OriginHost", "field_id": "OriginHost"}, + {"tag":"RequestType", "type": "*composed", "value": "~RequestType", "field_id": "RequestType"}, + {"tag":"Direction", "type": "*composed", "value": "~Direction", "field_id": "Direction"}, + {"tag":"Tenant", "type": "*composed", "value": "~Tenant", "field_id": "Tenant"}, + {"tag":"Category", "type": "*composed", "value": "~Category", "field_id": "Category"}, + {"tag":"Account", "type": "*composed", "value": "~Account", "field_id": "Account"}, + {"tag":"Subject", "type": "*composed", "value": "~Subject", "field_id": "Subject"}, + {"tag":"Destination", "type": "*composed", "value": "~Destination", "field_id": "Destination"}, + {"tag":"SetupTime", "type": "*composed", "value": "~SetupTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "SetupTime"}, + {"tag":"AnswerTime", "type": "*composed", "value": "~AnswerTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "AnswerTime"}, + {"tag":"Usage", "type": "*composed", "value": "~Usage", "field_id": "Usage"}, + {"tag":"Cost", "type": "*composed", "value": "~Cost", "field_id": "Cost"}, + ], + }, + "amqp_test_file": { + "export_format": "*aws_json_map", + "export_path": "amqp://guest:guest@localhost:25672/?queue_id=cgrates_cdrs", + "content_fields": [ + {"tag": "CGRID", "type": "*composed", "value": "~CGRID", "field_id": "CGRID"}, + ], + }, +}, + +} \ No newline at end of file diff --git a/engine/poster.go b/engine/poster.go index 5dbfc4238..59ea1a99d 100644 --- a/engine/poster.go +++ b/engine/poster.go @@ -360,17 +360,7 @@ func (pstr *AMQPPoster) NewPostChannel() (postChan *amqp.Channel, err error) { // writeToFile writes the content in the file with fileName on amqp.fallbackFileDir func (pstr *AMQPPoster) writeToFile(fileName string, content []byte) (err error) { - fallbackFilePath := path.Join(pstr.fallbackFileDir, fileName) - _, err = guardian.Guardian.Guard(func() (interface{}, error) { - fileOut, err := os.Create(fallbackFilePath) - if err != nil { - return nil, err - } - _, err = fileOut.Write(content) - fileOut.Close() - return nil, err - }, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath) - return + return writeToFile(pstr.fallbackFileDir, fileName, content) } func NewAWSPoster(dialURL string, attempts int, fallbackFileDir string) (*AWSPoster, error) { @@ -476,7 +466,11 @@ func (pstr *AWSPoster) newPosterSession() (s *amqpv1.Session, err error) { // writeToFile writes the content in the file with fileName on amqp.fallbackFileDir func (pstr *AWSPoster) writeToFile(fileName string, content []byte) (err error) { - fallbackFilePath := path.Join(pstr.fallbackFileDir, fileName) + return writeToFile(pstr.fallbackFileDir, fileName, content) +} + +func writeToFile(fileDir, fileName string, content []byte) (err error) { + fallbackFilePath := path.Join(fileDir, fileName) _, err = guardian.Guardian.Guard(func() (interface{}, error) { fileOut, err := os.Create(fallbackFilePath) if err != nil { diff --git a/general_tests/cdrs_onlexp_aws_it_test.go b/general_tests/cdrs_onlexp_aws_it_test.go new file mode 100644 index 000000000..5b34dbf3d --- /dev/null +++ b/general_tests/cdrs_onlexp_aws_it_test.go @@ -0,0 +1,189 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package general_tests + +import ( + "io/ioutil" + "os" + "path" + "reflect" + "strings" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" + amqpv1 "github.com/vcabbage/amqp" +) + +var cdrsAWSCfgPath string +var cdrsAWSCfg *config.CGRConfig +var cdrsAWSRpc *rpcclient.RpcClient + +func TestCDRsOnExpAWSInitConfig(t *testing.T) { + var err error + cdrsAWSCfgPath = path.Join(*dataDir, "conf", "samples", "cdrsonexpamqp") + if cdrsAWSCfg, err = config.NewCGRConfigFromFolder(cdrsAWSCfgPath); err != nil { + t.Fatal("Got config error: ", err.Error()) + } +} + +// InitDb so we can rely on count +func TestCDRsOnExpAWSInitCdrDb(t *testing.T) { + if err := engine.InitStorDb(cdrsAWSCfg); err != nil { + t.Fatal(err) + } + + if err := os.RemoveAll(cdrsAWSCfg.GeneralCfg().FailedPostsDir); err != nil { + t.Fatal("Error removing folder: ", cdrsAWSCfg.GeneralCfg().FailedPostsDir, err) + } + + if err := os.Mkdir(cdrsAWSCfg.GeneralCfg().FailedPostsDir, 0700); err != nil { + t.Error(err) + } + +} + +func TestCDRsOnExpAWSStartMasterEngine(t *testing.T) { + if _, err := engine.StopStartEngine(cdrsAWSCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func TestCDRsOnExpAWSProccesCDR(t *testing.T) { + cdrsAWSRpc, err = rpcclient.NewRpcClient("tcp", cdrsAWSCfg.ListenCfg().RPCJSONListen, false, "", "", "", 1, 1, + time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil, false) + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } + testCdr1 := &engine.CDR{ + CGRID: utils.Sha1("httpjsonrpc1", time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC).String()), + ToR: utils.VOICE, + OriginID: "httpjsonrpc1", + OriginHost: "192.168.1.1", + Source: "UNKNOWN", + RequestType: utils.META_PSEUDOPREPAID, + Tenant: "cgrates.org", + Category: "call", + Account: "1001", + Subject: "1001", + Destination: "1002", + SetupTime: time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC), + AnswerTime: time.Date(2013, 12, 7, 8, 42, 26, 0, time.UTC), + Usage: time.Duration(10) * time.Second, + ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, + RunID: utils.DEFAULT_RUNID, + Cost: 1.201, + PreRated: true, + } + var reply string + if err := cdrsAWSRpc.Call("CdrsV2.ProcessCdr", testCdr1, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if reply != utils.OK { + t.Error("Unexpected reply received: ", reply) + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) +} + +func TestCDRsOnExpAWSAMQPData(t *testing.T) { + // client, err := + amqpv1.Dial("amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs") + // if err != nil { + // t.Fatal("Dialing AMQP server:", err) + // } + // defer client.Close() + // // Open a session + // session, err := client.NewSession() + // if err != nil { + // t.Fatal("Creating AMQP session:", err) + // } + // ctx := context.Background() + // defer session.Close(ctx) + + // receiver, err := session.NewReceiver(amqpv1.LinkSourceAddress("/cgrates_cdrs")) + // if err != nil { + // t.Fatal("Creating receiver link:", err) + // } + // defer func() { + // ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + // receiver.Close(ctx) + // cancel() + // }() + + // ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + // // Receive next message + // msg, err := receiver.Receive(ctx) + // if err != nil { + // t.Fatal("Reading message from AMQP1.0:", err) + // } + + // // Accept message + // msg.Accept() + + // msgData := msg.GetData() + // cancel() + + // var rcvCDR map[string]string + // if err := json.Unmarshal(msgData, &rcvCDR); err != nil { + // t.Error(err) + // } + // if rcvCDR[utils.CGRID] != utils.Sha1("httpjsonrpc1", time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC).String()) { + // t.Errorf("Unexpected CDR received: %+v", rcvCDR) + // } +} + +func TestCDRsOnExpAWSAMQPPosterFileFailover(t *testing.T) { + time.Sleep(time.Duration(10 * time.Second)) + failoverContent := [][]byte{[]byte(`{"CGRID":"57548d485d61ebcba55afbe5d939c82a8e9ff670"}`), []byte(`{"CGRID":"88ed9c38005f07576a1e1af293063833b60edcc6"}`)} + filesInDir, _ := ioutil.ReadDir(cdrsAWSCfg.GeneralCfg().FailedPostsDir) + if len(filesInDir) == 0 { + t.Fatalf("No files in directory: %s", cdrsAWSCfg.GeneralCfg().FailedPostsDir) + } + var foundFile bool + var fileName string + for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config + fileName = file.Name() + if strings.HasPrefix(fileName, "cdr|*aws_json_map") { + foundFile = true + filePath := path.Join(cdrsAWSCfg.GeneralCfg().FailedPostsDir, fileName) + if readBytes, err := ioutil.ReadFile(filePath); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(failoverContent[0], readBytes) && !reflect.DeepEqual(failoverContent[1], readBytes) { // Checking just the prefix should do since some content is dynamic + t.Errorf("Expecting: %v or %v, received: %v", string(failoverContent[0]), string(failoverContent[1]), string(readBytes)) + } + if err := os.Remove(filePath); err != nil { + t.Error("Failed removing file: ", filePath) + } + } + } + if !foundFile { + t.Fatal("Could not find the file in folder") + } +} + +func TestCDRsOnExpAWSStopEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +}