From 1c0f94cef0d191a9e0ca79365984d24a5740ef96 Mon Sep 17 00:00:00 2001 From: TeoV Date: Sun, 21 Jul 2019 13:52:58 +0300 Subject: [PATCH] Add an integration test for LoaderS --- data/conf/samples/tutmongo/cgrates.json | 36 +++++ data/conf/samples/tutmysql/cgrates.json | 27 ---- loaders/loader_it_test.go | 173 ++++++++++++++++++++++++ 3 files changed, 209 insertions(+), 27 deletions(-) diff --git a/data/conf/samples/tutmongo/cgrates.json b/data/conf/samples/tutmongo/cgrates.json index eedf38330..ae0280593 100644 --- a/data/conf/samples/tutmongo/cgrates.json +++ b/data/conf/samples/tutmongo/cgrates.json @@ -118,6 +118,42 @@ }, +"loaders": [ + { + "id": "CustomLoader", // identifier of the Loader + "enabled": true, // starts as service: . + "dry_run": false, // do not send the CDRs to CDRS, just parse them + "run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify + "lock_filename": ".cgr.lock", // Filename containing concurrency lock in case of delayed processing + "caches_conns": [ + {"address": "*internal"}, // address where to reach the CacheS for data reload, empty for no reloads <""|*internal|x.y.z.y:1234> + ], + "field_separator": ",", // separator used in case of csv files + "tp_in_dir": "/tmp/In", // absolute path towards the directory where the CDRs are stored + "tp_out_dir": "/tmp/Out", // absolute path towards the directory where processed CDRs will be moved + "data":[ + { + "type": "*attributes", // data source type + "file_name": "Attributes.csv", // file name in the tp_in_dir + "fields": [ + {"tag": "TenantID", "field_id": "Tenant", "type": "*composed", "value": "~0", "mandatory": true}, + {"tag": "ProfileID", "field_id": "ID", "type": "*composed", "value": "~1", "mandatory": true}, + {"tag": "Contexts", "field_id": "Contexts", "type": "*composed", "value": "~2"}, + {"tag": "FilterIDs", "field_id": "FilterIDs", "type": "*composed", "value": "~3"}, + {"tag": "ActivationInterval", "field_id": "ActivationInterval", "type": "*composed", "value": "~4"}, + {"tag": "AttributeFilterIDs", "field_id": "AttributeFilterIDs", "type": "*composed", "value": "~5"}, + {"tag": "FieldName", "field_id": "FieldName", "type": "*composed", "value": "~6"}, + {"tag": "Type", "field_id": "Type", "type": "*composed", "value": "~7"}, + {"tag": "Value", "field_id": "Value", "type": "*composed", "value": "~8"}, + {"tag": "Blocker", "field_id": "Blocker", "type": "*composed", "value": "~9"}, + {"tag": "Weight", "field_id": "Weight", "type": "*composed", "value": "~10"}, + ], + }, + ], + }, +], + + "sessions": { "enabled": true, "rals_conns": [ diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index 8ee63d3de..cb6125280 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -102,33 +102,6 @@ }, ], }, - { - "id": "FilterLoader", // identifier of the Loader - "enabled": false, // starts as service: . - "dry_run": false, // do not send the CDRs to CDRS, just parse them - "run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify - "lock_filename": ".cgr.lock", // Filename containing concurrency lock in case of delayed processing - "caches_conns": [ - {"address": "*internal"}, // address where to reach the CacheS for data reload, empty for no reloads <""|*internal|x.y.z.y:1234> - ], - "field_separator": ",", // separator used in case of csv files - "tp_in_dir": "/tmp/FilterIn", // absolute path towards the directory where the CDRs are stored - "tp_out_dir": "/tmp/FilterOut", // absolute path towards the directory where processed CDRs will be moved - "data":[ - { - "type": "*filters", // data source type - "file_name": "Filters.csv", // file name in the tp_in_dir - "fields": [ - {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "~0", "mandatory": true}, - {"tag": "ID", "field_id": "ID", "type": "*composed", "value": "~1", "mandatory": true}, - {"tag": "FilterType", "field_id": "FilterType", "type": "*composed", "value": "~2"}, - {"tag": "FilterFieldName", "field_id": "FilterFieldName", "type": "*composed", "value": "~3"}, - {"tag": "FilterFieldValues", "field_id": "FilterFieldValues", "type": "*composed", "value": "~4"}, - {"tag": "ActivationInterval", "field_id": "ActivationInterval", "type": "*composed", "value": "~5"}, - ], - }, - ], - }, ], diff --git a/loaders/loader_it_test.go b/loaders/loader_it_test.go index 246ffc094..aef0c1f6a 100644 --- a/loaders/loader_it_test.go +++ b/loaders/loader_it_test.go @@ -18,3 +18,176 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ package loaders + +import ( + "io/ioutil" + "net/rpc" + "net/rpc/jsonrpc" + "os" + "path" + "reflect" + "sort" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + loaderCfgPath string + loaderCfg *config.CGRConfig + loaderRPC *rpc.Client + loaderDataDir = "/usr/share/cgrates" + loaderConfigDIR string //run tests for specific configuration + loaderPathIn, loaderPathOut string +) + +var sTestsLoader = []func(t *testing.T){ + testLoaderInitCfg, + testLoaderResetDataDB, + testLoaderStartEngine, + testLoaderRPCConn, + testLoaderMakeFolders, + testLoaderPopulateData, + testLoaderLoadAttributes, + testLoaderVerifyOutDir, + testLoaderCheckAttributes, + testLoaderKillEngine, +} + +//Test start here +func TestLoaderITMySql(t *testing.T) { + loaderConfigDIR = "tutmysql" + for _, stest := range sTestsLoader { + t.Run(loaderConfigDIR, stest) + } +} + +func TestLoaderITMongo(t *testing.T) { + loaderConfigDIR = "tutmongo" + for _, stest := range sTestsLoader { + t.Run(loaderConfigDIR, stest) + } +} + +func testLoaderInitCfg(t *testing.T) { + var err error + loaderCfgPath = path.Join(loaderDataDir, "conf", "samples", loaderConfigDIR) + loaderCfg, err = config.NewCGRConfigFromPath(loaderCfgPath) + if err != nil { + t.Error(err) + } + loaderCfg.DataFolderPath = loaderDataDir // Share DataFolderPath through config towards StoreDb for Flush() + config.SetCgrConfig(loaderCfg) +} + +// Wipe out the cdr database +func testLoaderResetDataDB(t *testing.T) { + if err := engine.InitDataDb(loaderCfg); err != nil { + t.Fatal(err) + } +} + +// Start CGR Engine +func testLoaderStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(loaderCfgPath, 100); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func testLoaderRPCConn(t *testing.T) { + var err error + loaderRPC, err = jsonrpc.Dial("tcp", loaderCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } +} + +func testLoaderMakeFolders(t *testing.T) { + for _, dir := range []string{loaderCfg.LoaderCfg()[1].TpInDir, loaderCfg.LoaderCfg()[1].TpOutDir} { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + if err := os.MkdirAll(dir, 0755); err != nil { + t.Fatal("Error creating folder: ", dir, err) + } + } + loaderPathIn = loaderCfg.LoaderCfg()[1].TpInDir + loaderPathOut = loaderCfg.LoaderCfg()[1].TpOutDir +} + +func testLoaderPopulateData(t *testing.T) { + fileName := utils.AttributesCsv + tmpFilePath := path.Join("/tmp", fileName) + if err := ioutil.WriteFile(tmpFilePath, []byte(engine.AttributesCSVContent), 0777); err != nil { + t.Fatal(err.Error()) + } + if err := os.Rename(tmpFilePath, path.Join(loaderPathIn, fileName)); err != nil { + t.Fatal("Error moving file to processing directory: ", err) + } +} + +func testLoaderLoadAttributes(t *testing.T) { + var reply string + if err := loaderRPC.Call(utils.LoaderSv1Load, + &ArgsProcessFolder{LoaderID: "CustomLoader"}, &reply); err != nil { + t.Error(err) + } +} + +func testLoaderVerifyOutDir(t *testing.T) { + time.Sleep(100 * time.Millisecond) + if outContent1, err := ioutil.ReadFile(path.Join(loaderPathOut, utils.AttributesCsv)); err != nil { + t.Error(err) + } else if engine.AttributesCSVContent != string(outContent1) { + t.Errorf("Expecting: %q, received: %q", engine.AttributesCSVContent, string(outContent1)) + } +} + +func testLoaderCheckAttributes(t *testing.T) { + eAttrPrf := &engine.AttributeProfile{ + Tenant: "cgrates.org", + ID: "ALS1", + Contexts: []string{"con1", "con2", "con3"}, + FilterIDs: []string{"*string:~Account:1001"}, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 29, 15, 0, 0, 0, time.UTC)}, + Attributes: []*engine.Attribute{ + &engine.Attribute{ + FilterIDs: []string{"*string:~Field1:Initial"}, + FieldName: "Field1", + Type: utils.MetaVariable, + Value: config.NewRSRParsersMustCompile("Sub1", true, utils.INFIELD_SEP), + }, + &engine.Attribute{ + FilterIDs: []string{}, + FieldName: "Field2", + Type: utils.MetaVariable, + Value: config.NewRSRParsersMustCompile("Sub2", true, utils.INFIELD_SEP), + }}, + Blocker: true, + Weight: 20, + } + + var reply *engine.AttributeProfile + if err := loaderRPC.Call("ApierV1.GetAttributeProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "ALS1"}, &reply); err != nil { + t.Fatal(err) + } + eAttrPrf.Compile() + reply.Compile() + sort.Strings(eAttrPrf.Contexts) + sort.Strings(reply.Contexts) + if !reflect.DeepEqual(eAttrPrf, reply) { + t.Errorf("Expecting : %+v, received: %+v", eAttrPrf, reply) + } +} + +func testLoaderKillEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +}