// +build integration /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ package ers import ( "reflect" "testing" "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" _ "github.com/go-sql-driver/mysql" "github.com/jinzhu/gorm" ) var ( sqlCfgPath string sqlCfg *config.CGRConfig sqlTests = []func(t *testing.T){ testSQLInitConfig, testSQLInitCdrDb, testSQLInitDB, testSQLReader, testSQLEmptyTable, testSQLInitDB, testSQLReader2, testSQLStop, } cdr = &engine.CDR{ CGRID: "CGRID", RunID: "RunID", } db *gorm.DB dbConnString = "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'" ) func TestSQL(t *testing.T) { // sqlCfgPath = path.Join(*dataDir, "conf", "samples", "ers_reload", "disabled") for _, test := range sqlTests { t.Run("TestSQL", test) } } func testSQLInitConfig(t *testing.T) { var err error if sqlCfg, err = config.NewCGRConfigFromJsonStringWithDefaults(`{ "stor_db": { "db_password": "CGRateS.org", }, "ers": { // EventReaderService "enabled": true, // starts the EventReader service: "readers": [ { "id": "mysql", // identifier of the EventReader profile "type": "*sql", // reader type <*file_csv> "run_delay": 1, // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited "source_path": "*mysql:cgrates:CGRateS.org@127.0.0.1:3306", // read data from this path // "processed_path": "/var/spool/cgrates/cdrc/out", // move processed data here "tenant": "cgrates.org", // tenant used by import "filters": [], // limit parsing based on the filters "flags": [], // flags to influence the event processing // "header_fields": [], // template of the import header fields "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "CGRID", "type": "*composed", "value": "~*req.cgrid", "field_id": "CGRID"}, ], // "trailer_fields": [], // template of the import trailer fields }, ], }, }`); err != nil { t.Fatal(err) } utils.Newlogger(utils.MetaSysLog, sqlCfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) } func testSQLInitCdrDb(t *testing.T) { if err := engine.InitStorDb(sqlCfg); err != nil { t.Fatal(err) } } func testSQLInitDB(t *testing.T) { cdr.CGRID = utils.UUIDSha1Prefix() var err error db, err = gorm.Open("mysql", dbConnString) if err != nil { t.Fatal(err) } db = db.Table(utils.CDRsTBL) tx := db.Begin() cdrSql := cdr.AsCDRsql() cdrSql.CreatedAt = time.Now() saved := tx.Save(cdrSql) if saved.Error != nil { tx.Rollback() t.Fatal(err) } tx.Commit() } func testSQLReader(t *testing.T) { rdrEvents = make(chan *erEvent, 1) rdrErr = make(chan error, 1) rdrExit = make(chan struct{}, 1) sqlER, err := NewEventReader(sqlCfg, 1, rdrEvents, rdrErr, new(engine.FilterS), rdrExit) if err != nil { t.Fatal(err) } sqlER.Serve() select { case err = <-rdrErr: t.Error(err) case ev := <-rdrEvents: if ev.rdrCfg.ID != "mysql" { t.Errorf("Expected 'mysql' received `%s`", ev.rdrCfg.ID) } expected := &utils.CGREvent{ Tenant: "cgrates.org", ID: ev.cgrEvent.ID, Time: ev.cgrEvent.Time, Event: map[string]interface{}{ "CGRID": cdr.CGRID, }, } if !reflect.DeepEqual(ev.cgrEvent, expected) { t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) } case <-time.After(time.Second): t.Fatal("Timeout") } } func testSQLEmptyTable(t *testing.T) { rows, err := db.Table(utils.CDRsTBL).Select("*").Rows() if err != nil { t.Error(err) } for rows.Next() { t.Fatal("Expected empty table") } } func testSQLReader2(t *testing.T) { select { case err := <-rdrErr: t.Error(err) case ev := <-rdrEvents: if ev.rdrCfg.ID != "mysql" { t.Errorf("Expected 'mysql' received `%s`", ev.rdrCfg.ID) } expected := &utils.CGREvent{ Tenant: "cgrates.org", ID: ev.cgrEvent.ID, Time: ev.cgrEvent.Time, Event: map[string]interface{}{ "CGRID": cdr.CGRID, }, } if !reflect.DeepEqual(ev.cgrEvent, expected) { t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) } case <-time.After(time.Second): t.Fatal("Timeout") } } func testSQLStop(t *testing.T) { rdrExit <- struct{}{} if err := db.Close(); err != nil { t.Error(err) } }