diff --git a/apier/v1/cdre.go b/apier/v1/cdre.go index d57f0b894..73db68491 100644 --- a/apier/v1/cdre.go +++ b/apier/v1/cdre.go @@ -273,6 +273,8 @@ func (api *APIerSv1) ExportCDRs(arg ArgExportCDRs, reply *RplExportedCDRs) (err filePath = path.Join(eDir, fileName) case utils.DRYRUN: filePath = utils.DRYRUN + case utils.MetaAMQPjsonMap: + filePath = eDir default: u, _ := url.Parse(eDir) u.Path = path.Join(u.Path, fileName) diff --git a/apier/v1/cdre_amqp_it_test.go b/apier/v1/cdre_amqp_it_test.go new file mode 100644 index 000000000..6771fec8e --- /dev/null +++ b/apier/v1/cdre_amqp_it_test.go @@ -0,0 +1,252 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package v1 + +import ( + "net/rpc" + "path" + "reflect" + "sort" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + amqp "github.com/rabbitmq/amqp091-go" +) + +var ( + amqpCfgPath string + amqpCfg *config.CGRConfig + amqpRPC *rpc.Client + amqpConfigDIR string + + sTestsCDReAMQP = []func(t *testing.T){ + testAMQPInitCfg, + testAMQPInitDataDb, + testAMQPResetStorDb, + testAMQPStartEngine, + testAMQPRPCConn, + testAMQPAddCDRs, + testAMQPExportCDRs, + testAMQPVerifyExport, + testAMQPKillEngine, + } +) + +func TestAMQPExport(t *testing.T) { + amqpConfigDIR = "cdre_amqp" + for _, stest := range sTestsCDReAMQP { + t.Run(amqpConfigDIR, stest) + } +} + +func testAMQPInitCfg(t *testing.T) { + var err error + amqpCfgPath = path.Join(alsPrfDataDir, "conf", "samples", amqpConfigDIR) + amqpCfg, err = config.NewCGRConfigFromPath(amqpCfgPath) + if err != nil { + t.Fatal(err) + } + amqpCfg.DataFolderPath = alsPrfDataDir // Share DataFolderPath through config towards StoreDb for Flush() + config.SetCgrConfig(amqpCfg) +} + +func testAMQPInitDataDb(t *testing.T) { + if err := engine.InitDataDb(amqpCfg); err != nil { + t.Fatal(err) + } +} + +func testAMQPResetStorDb(t *testing.T) { + if err := engine.InitStorDb(amqpCfg); err != nil { + t.Fatal(err) + } +} + +func testAMQPStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(amqpCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testAMQPRPCConn(t *testing.T) { + var err error + amqpRPC, err = newRPCClient(amqpCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } +} + +func testAMQPAddCDRs(t *testing.T) { + storedCdrs := []*engine.CDR{ + { + CGRID: "Cdr1", + OrderID: 101, + ToR: utils.VOICE, + OriginID: "OriginCDR1", + OriginHost: "192.168.1.1", + Source: "test", + RequestType: utils.META_RATED, + Tenant: "cgrates.org", + Category: "call", + Account: "1001", + Subject: "1001", + Destination: "+4986517174963", + SetupTime: time.Now(), + AnswerTime: time.Now(), + RunID: utils.MetaDefault, + Usage: time.Duration(10) * time.Second, + ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, + Cost: 1.01, + }, + { + CGRID: "Cdr2", + OrderID: 102, + ToR: utils.VOICE, + OriginID: "OriginCDR2", + OriginHost: "192.168.1.1", + Source: "test2", + RequestType: utils.META_RATED, + Tenant: "cgrates.org", + Category: "call", + Account: "1001", + Subject: "1001", + Destination: "+4986517174963", + SetupTime: time.Now(), + AnswerTime: time.Now(), + RunID: utils.MetaDefault, + Usage: time.Duration(5) * time.Second, + ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, + Cost: 1.01, + }, + { + CGRID: "Cdr3", + OrderID: 103, + ToR: utils.VOICE, + OriginID: "OriginCDR3", + OriginHost: "192.168.1.1", + Source: "test2", + RequestType: utils.META_RATED, + Tenant: "cgrates.org", + Category: "call", + Account: "1001", + Subject: "1001", + Destination: "+4986517174963", + SetupTime: time.Now(), + AnswerTime: time.Now(), + RunID: utils.MetaDefault, + Usage: time.Duration(30) * time.Second, + ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, + Cost: 1.01, + }, + { + CGRID: "Cdr4", + OrderID: 104, + ToR: utils.VOICE, + OriginID: "OriginCDR4", + OriginHost: "192.168.1.1", + Source: "test3", + RequestType: utils.META_RATED, + Tenant: "cgrates.org", + Category: "call", + Account: "1001", + Subject: "1001", + Destination: "+4986517174963", + SetupTime: time.Now(), + AnswerTime: time.Time{}, + RunID: utils.MetaDefault, + Usage: time.Duration(0) * time.Second, + ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, + }, + } + for _, cdr := range storedCdrs { + var reply string + if err := amqpRPC.Call(utils.CDRsV1ProcessCDR, &engine.CDRWithArgDispatcher{CDR: cdr}, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if reply != utils.OK { + t.Error("Unexpected reply received: ", reply) + } + } + time.Sleep(100 * time.Millisecond) +} + +func testAMQPExportCDRs(t *testing.T) { + attr := ArgExportCDRs{ + ExportArgs: map[string]interface{}{ + utils.ExportTemplate: "AMQP_EXPORTER", + }, + Verbose: true, + } + var rply RplExportedCDRs + if err := amqpRPC.Call(utils.APIerSv1ExportCDRs, attr, &rply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(rply.ExportedCGRIDs) != 2 { + t.Errorf("Unexpected number of CDR exported: %s ", utils.ToJSON(rply)) + } +} + +func testAMQPVerifyExport(t *testing.T) { + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + ch, err := conn.Channel() + if err != nil { + t.Fatal(err) + } + defer ch.Close() + q, err := ch.QueueDeclare("cgrates_cdrs", true, false, false, false, nil) + if err != nil { + t.Fatal(err) + } + msgs, err := ch.Consume(q.Name, utils.EmptyString, true, false, false, false, nil) + if err != nil { + t.Fatal(err) + } + expCDRs := []string{ + `{"Account":"1001","CGRID":"Cdr2","Category":"call","Cost":"-1.0000","Destination":"+4986517174963","OriginID":"OriginCDR2","RunID":"*default","Source":"test2","Tenant":"cgrates.org","Usage":"5s"}`, + `{"Account":"1001","CGRID":"Cdr3","Category":"call","Cost":"-1.0000","Destination":"+4986517174963","OriginID":"OriginCDR3","RunID":"*default","Source":"test2","Tenant":"cgrates.org","Usage":"30s"}`, + } + rcvCDRs := make([]string, 0) + waiting := true + for waiting { + select { + case d := <-msgs: + rcvCDRs = append(rcvCDRs, string(d.Body)) + case <-time.After(100 * time.Millisecond): + waiting = false + } + } + sort.Strings(rcvCDRs) + if !reflect.DeepEqual(rcvCDRs, expCDRs) { + t.Errorf("expected: <%+v>, \nreceived: <%+v>", expCDRs, rcvCDRs) + } +} + +func testAMQPKillEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +} diff --git a/data/conf/samples/cdre_amqp/cgrates.json b/data/conf/samples/cdre_amqp/cgrates.json new file mode 100644 index 000000000..8763bf1d5 --- /dev/null +++ b/data/conf/samples/cdre_amqp/cgrates.json @@ -0,0 +1,62 @@ +{ + +"general": { + "log_level": 7, + "connect_timeout": "1h", + "reply_timeout": "1h" +}, + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080" +}, + +"data_db": { + "db_type": "redis", + "db_port": 6379, + "db_name": "10" + +}, + +"stor_db": { + "db_password": "CGRateS.org" +}, + +"schedulers": { + "enabled": true +}, + +"cdrs": { + "enabled": true +}, + +"cdre": { + "AMQP_EXPORTER": { + "export_format": "*amqp_json_map", + "export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs&routing_key=cgrates_cdrs", + "filters" :["*string:~*req.Source:test2"], + "synchronous": true, + "fields": [ + {"path": "*exp.CGRID", "type": "*composed", "value": "~*req.CGRID"}, + {"path": "*exp.RunID", "type": "*composed", "value": "~*req.RunID"}, + {"path": "*exp.Source", "type": "*composed", "value": "~*req.Source"}, + {"path": "*exp.OriginID", "type": "*composed", "value": "~*req.OriginID"}, + {"path": "*exp.Tenant", "type": "*composed", "value": "~*req.Tenant"}, + {"path": "*exp.Category", "type": "*composed", "value": "~*req.Category"}, + {"path": "*exp.Account", "type": "*composed", "value": "~*req.Account"}, + {"path": "*exp.Destination", "type": "*composed", "value": "~*req.Destination"}, + {"path": "*exp.Usage", "type": "*composed", "value": "~*req.Usage"}, + {"path": "*exp.Cost", "type": "*composed", "value": "~*req.Cost", "rounding_decimals": 4} + ] + } +}, + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"] +} + + +} + \ No newline at end of file