diff --git a/apier/v1/tprates.go b/apier/v1/tprates.go index 5aae835a1..92f047c8a 100644 --- a/apier/v1/tprates.go +++ b/apier/v1/tprates.go @@ -47,9 +47,10 @@ func (self *ApierV1) GetTPRate(attrs AttrGetTPRate, reply *utils.TPRate) error { return utils.NewErrMandatoryIeMissing(missing...) } if rs, err := self.StorDb.GetTPRates(attrs.TPid, attrs.ID); err != nil { - return utils.NewErrServerError(err) - } else if len(rs) == 0 { - return utils.ErrNotFound + if err.Error() != utils.ErrNotFound.Error() { + err = utils.NewErrServerError(err) + } + return err } else { *reply = *rs[0] } diff --git a/apier/v1/tprates_it_test.go b/apier/v1/tprates_it_test.go new file mode 100644 index 000000000..848375c03 --- /dev/null +++ b/apier/v1/tprates_it_test.go @@ -0,0 +1,240 @@ +// +build offline_TP + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package v1 + +import ( + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "net/rpc" + "net/rpc/jsonrpc" + "path" + "reflect" + "testing" +) + +var ( + tpRateCfgPath string + tpRateCfg *config.CGRConfig + tpRateRPC *rpc.Client + tpRateDataDir = "/usr/share/cgrates" + tpRate *utils.TPRate + tpRateDelay int + tpRateConfigDIR string //run tests for specific configuration + +) + +var sTestsTPRates = []func(t *testing.T){ + testTPRatesInitCfg, + testTPRatesResetStorDb, + testTPRatesStartEngine, + testTPRatesRpcConn, + testTPRatesGetTPRateforeSet, + testTPRatesSetTPRate, + testTPRatesGetTPRateAfterSet, + testTPRatesGetTPRateIds, + testTPRatesUpdateTPRate, + testTPRatesGetTPRateAfterUpdate, + testTPRatesRemTPRate, + testTPRatesGetTPRateAfterRemove, + testTPRatesKillEngine, +} + +//Test start here +func TestTPRatesITMySql(t *testing.T) { + tpRateConfigDIR = "tutmysql" + for _, stest := range sTestsTPRates { + t.Run(tpRateConfigDIR, stest) + } +} + +func TestTPRatesITMongo(t *testing.T) { + tpRateConfigDIR = "tutmongo" + for _, stest := range sTestsTPRates { + t.Run(tpRateConfigDIR, stest) + } +} + +func TestTPRatesITPG(t *testing.T) { + tpRateConfigDIR = "tutpostgres" + for _, stest := range sTestsTPRates { + t.Run(tpRateConfigDIR, stest) + } +} + +func testTPRatesInitCfg(t *testing.T) { + var err error + tpRateCfgPath = path.Join(tpRateDataDir, "conf", "samples", tpRateConfigDIR) + tpRateCfg, err = config.NewCGRConfigFromFolder(tpRateCfgPath) + if err != nil { + t.Error(err) + } + tpRateCfg.DataFolderPath = tpRateDataDir // Share DataFolderPath through config towards StoreDb for Flush() + config.SetCgrConfig(tpRateCfg) + switch tpRateConfigDIR { + case "tutmongo": // Mongo needs more time to reset db, need to investigate + tpRateDelay = 2000 + default: + tpRateDelay = 1000 + } +} + +// Wipe out the cdr database +func testTPRatesResetStorDb(t *testing.T) { + if err := engine.InitStorDb(tpRateCfg); err != nil { + t.Fatal(err) + } +} + +// Start CGR Engine +func testTPRatesStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(tpRateCfgPath, tpRateDelay); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func testTPRatesRpcConn(t *testing.T) { + var err error + tpRateRPC, err = jsonrpc.Dial("tcp", tpRateCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } +} + +func testTPRatesGetTPRateforeSet(t *testing.T) { + var reply *utils.TPRate + if err := tpRateRPC.Call("ApierV1.GetTPRate", &AttrGetTPRate{TPid: "TPidTpRate", ID: "RT_FS_USERS"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } +} + +func testTPRatesSetTPRate(t *testing.T) { + tpRate = &utils.TPRate{ + TPid: "TPidTpRate", + ID: "RT_FS_USERS", + RateSlots: []*utils.RateSlot{ + &utils.RateSlot{ + ConnectFee: 12, + Rate: 3, + RateUnit: "6s", + RateIncrement: "6s", + GroupIntervalStart: "0s", + }, + &utils.RateSlot{ + ConnectFee: 12, + Rate: 3, + RateUnit: "4s", + RateIncrement: "6s", + GroupIntervalStart: "1s", + }, + }, + } + var result string + if err := tpRateRPC.Call("ApierV1.SetTPRate", tpRate, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } +} + +func testTPRatesGetTPRateAfterSet(t *testing.T) { + var reply *utils.TPRate + if err := tpRateRPC.Call("ApierV1.GetTPRate", &AttrGetTPRate{TPid: "TPidTpRate", ID: tpRate.ID}, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tpRate, reply) { + t.Errorf("Expecting : %+v, received: %+v", tpRate, reply) + } +} + +func testTPRatesGetTPRateIds(t *testing.T) { + var result []string + expectedTPID := []string{"RT_FS_USERS"} + if err := tpRateRPC.Call("ApierV1.GetTPRateIds", &AttrGetTPRateIds{TPid: "TPidTpRate"}, &result); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expectedTPID, result) { + t.Errorf("Expecting: %+v, received: %+v", expectedTPID, result) + } +} + +func testTPRatesUpdateTPRate(t *testing.T) { + var result string + tpRate.RateSlots = []*utils.RateSlot{ + &utils.RateSlot{ + ConnectFee: 12, + Rate: 3, + RateUnit: "6s", + RateIncrement: "6s", + GroupIntervalStart: "0s", + }, + &utils.RateSlot{ + ConnectFee: 12, + Rate: 10, + RateUnit: "4s", + RateIncrement: "6s", + GroupIntervalStart: "1s", + }, + &utils.RateSlot{ + ConnectFee: 5, + Rate: 10, + RateUnit: "4s", + RateIncrement: "6s", + GroupIntervalStart: "3s", + }, + } + if err := tpRateRPC.Call("ApierV1.SetTPRate", tpRate, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } +} + +func testTPRatesGetTPRateAfterUpdate(t *testing.T) { + var reply *utils.TPRate + if err := tpRateRPC.Call("ApierV1.GetTPRate", &AttrGetTPRate{TPid: "TPidTpRate", ID: tpRate.ID}, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tpRate, reply) { + t.Errorf("Expecting : %+v, received: %+v", tpRate, reply) + } + +} + +func testTPRatesRemTPRate(t *testing.T) { + var resp string + if err := tpRateRPC.Call("ApierV1.RemTPRate", &AttrGetTPRate{TPid: "TPidTpRate", ID: "RT_FS_USERS"}, &resp); err != nil { + t.Error(err) + } else if resp != utils.OK { + t.Error("Unexpected reply returned", resp) + } +} + +func testTPRatesGetTPRateAfterRemove(t *testing.T) { + var reply *utils.TPRate + if err := tpRateRPC.Call("ApierV1.GetTPRate", &AttrGetTPRate{TPid: "TPidTpRate", ID: "RT_FS_USERS"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } +} + +func testTPRatesKillEngine(t *testing.T) { + if err := engine.KillEngine(tpRateDelay); err != nil { + t.Error(err) + } +} diff --git a/apier/v1/tpratingprofiles.go b/apier/v1/tpratingprofiles.go index 9888128d6..698ced6cd 100644 --- a/apier/v1/tpratingprofiles.go +++ b/apier/v1/tpratingprofiles.go @@ -46,7 +46,7 @@ type AttrGetTPRatingProfileByLoadId struct { func (self *ApierV1) GetTPRatingProfilesByLoadId(attrs utils.TPRatingProfile, reply *[]*utils.TPRatingProfile) error { mndtryFlds := []string{"TPid", "LoadId"} if len(attrs.Subject) != 0 { // If Subject provided as filter, make all related fields mandatory - mndtryFlds = append(mndtryFlds, "Tenant", "TOR", "Direction", "Subject") + mndtryFlds = append(mndtryFlds, "Tenant", "Category", "Direction", "Subject") } if missing := utils.MissingStructFields(&attrs, mndtryFlds); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) @@ -68,7 +68,7 @@ func (self *ApierV1) GetTPRatingProfileLoadIds(attrs utils.AttrTPRatingProfileId } if ids, err := self.StorDb.GetTpTableIds(attrs.TPid, utils.TBLTPRateProfiles, utils.TPDistinctIds{"loadid"}, map[string]string{ "tenant": attrs.Tenant, - "tor": attrs.Category, + "category": attrs.Category, "direction": attrs.Direction, "subject": attrs.Subject, }, new(utils.Paginator)); err != nil { @@ -96,9 +96,10 @@ func (self *ApierV1) GetTPRatingProfile(attrs AttrGetTPRatingProfile, reply *uti return err } if rpfs, err := self.StorDb.GetTPRatingProfiles(tmpRpf); err != nil { - return utils.NewErrServerError(err) - } else if len(rpfs) == 0 { - return utils.ErrNotFound + if err.Error() != utils.ErrNotFound.Error() { + err = utils.NewErrServerError(err) + } + return err } else { *reply = *rpfs[0] } diff --git a/apier/v1/tpratingprofiles_it_test.go b/apier/v1/tpratingprofiles_it_test.go new file mode 100644 index 000000000..a01e02de7 --- /dev/null +++ b/apier/v1/tpratingprofiles_it_test.go @@ -0,0 +1,262 @@ +// +build offline_TP + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package v1 + +import ( + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "net/rpc" + "net/rpc/jsonrpc" + "path" + "reflect" + "testing" +) + +var ( + tpRatingProfileCfgPath string + tpRatingProfileCfg *config.CGRConfig + tpRatingProfileRPC *rpc.Client + tpRatingProfileDataDir = "/usr/share/cgrates" + tpRatingProfile *utils.TPRatingProfile + tpRatingProfileDelay int + tpRatingProfileConfigDIR string //run tests for specific configuration + tpRatingProfileID = "RPrf:*out:Tenant1:Category:Subject" +) + +var sTestsTPRatingProfiles = []func(t *testing.T){ + testTPRatingProfilesInitCfg, + testTPRatingProfilesResetStorDb, + testTPRatingProfilesStartEngine, + testTPRatingProfilesRpcConn, + testTPRatingProfilesGetTPRatingProfileBeforeSet, + testTPRatingProfilesSetTPRatingProfile, + testTPRatingProfilesGetTPRatingProfileAfterSet, + testTPRatingProfilesGetTPRatingProfileLoadIds, + testTPRatingProfilesUpdateTPRatingProfile, + testTPRatingProfilesGetTPRatingProfileAfterUpdate, + testTPRatingProfilesRemTPRatingProfile, + testTPRatingProfilesGetTPRatingProfileAfterRemove, + testTPRatingProfilesKillEngine, +} + +//Test start here +func TestTPRatingProfilesITMySql(t *testing.T) { + tpRatingProfileConfigDIR = "tutmysql" + for _, stest := range sTestsTPRatingProfiles { + t.Run(tpRatingProfileConfigDIR, stest) + } +} + +func TestTPRatingProfilesITMongo(t *testing.T) { + tpRatingProfileConfigDIR = "tutmongo" + for _, stest := range sTestsTPRatingProfiles { + t.Run(tpRatingProfileConfigDIR, stest) + } +} + +func TestTPRatingProfilesITPG(t *testing.T) { + tpRatingProfileConfigDIR = "tutpostgres" + for _, stest := range sTestsTPRatingProfiles { + t.Run(tpRatingProfileConfigDIR, stest) + } +} + +func testTPRatingProfilesInitCfg(t *testing.T) { + var err error + tpRatingProfileCfgPath = path.Join(tpRatingProfileDataDir, "conf", "samples", tpRatingProfileConfigDIR) + tpRatingProfileCfg, err = config.NewCGRConfigFromFolder(tpRatingProfileCfgPath) + if err != nil { + t.Error(err) + } + tpRatingProfileCfg.DataFolderPath = tpRatingProfileDataDir // Share DataFolderPath through config towards StoreDb for Flush() + config.SetCgrConfig(tpRatingProfileCfg) + switch tpRatingProfileConfigDIR { + case "tutmongo": // Mongo needs more time to reset db, need to investigate + tpRatingProfileDelay = 2000 + default: + tpRatingProfileDelay = 1000 + } +} + +// Wipe out the cdr database +func testTPRatingProfilesResetStorDb(t *testing.T) { + if err := engine.InitStorDb(tpRatingProfileCfg); err != nil { + t.Fatal(err) + } +} + +// Start CGR Engine +func testTPRatingProfilesStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(tpRatingProfileCfgPath, tpRatingProfileDelay); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func testTPRatingProfilesRpcConn(t *testing.T) { + var err error + tpRatingProfileRPC, err = jsonrpc.Dial("tcp", tpRatingProfileCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } +} + +func testTPRatingProfilesGetTPRatingProfileBeforeSet(t *testing.T) { + var reply *utils.TPRatingProfile + if err := tpRatingProfileRPC.Call("ApierV1.GetTPRatingProfile", &AttrGetTPRatingProfile{TPid: "TPRProf1", RatingProfileId: tpRatingProfileID}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } +} + +func testTPRatingProfilesSetTPRatingProfile(t *testing.T) { + tpRatingProfile = &utils.TPRatingProfile{ + TPid: "TPRProf1", + LoadId: "RPrf", + Direction: "*out", + Tenant: "Tenant1", + Category: "Category", + Subject: "Subject", + RatingPlanActivations: []*utils.TPRatingActivation{ + &utils.TPRatingActivation{ + ActivationTime: "2014-07-29T15:00:00Z", + RatingPlanId: "PlanOne", + FallbackSubjects: "FallBack", + CdrStatQueueIds: "RandomId", + }, + &utils.TPRatingActivation{ + ActivationTime: "2015-07-29T10:00:00Z", + RatingPlanId: "PlanTwo", + FallbackSubjects: "FallOut", + CdrStatQueueIds: "RandomIdTwo", + }, + }, + } + var result string + if err := tpRatingProfileRPC.Call("ApierV1.SetTPRatingProfile", tpRatingProfile, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } +} + +func testTPRatingProfilesGetTPRatingProfileAfterSet(t *testing.T) { + var respond *utils.TPRatingProfile + if err := tpRatingProfileRPC.Call("ApierV1.GetTPRatingProfile", &AttrGetTPRatingProfile{TPid: "TPRProf1", RatingProfileId: tpRatingProfileID}, &respond); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tpRatingProfile.TPid, respond.TPid) { + t.Errorf("Expecting : %+v, received: %+v", tpRatingProfile.TPid, respond.TPid) + } else if !reflect.DeepEqual(tpRatingProfile.LoadId, respond.LoadId) { + t.Errorf("Expecting : %+v, received: %+v", tpRatingProfile.LoadId, respond.LoadId) + } else if !reflect.DeepEqual(tpRatingProfile.Direction, respond.Direction) { + t.Errorf("Expecting : %+v, received: %+v", tpRatingProfile.Direction, respond.Direction) + } else if !reflect.DeepEqual(tpRatingProfile.Tenant, respond.Tenant) { + t.Errorf("Expecting : %+v, received: %+v", tpRatingProfile.Tenant, respond.Tenant) + } else if !reflect.DeepEqual(tpRatingProfile.Category, respond.Category) { + t.Errorf("Expecting : %+v, received: %+v", tpRatingProfile.Category, respond.Category) + } else if !reflect.DeepEqual(tpRatingProfile.Subject, respond.Subject) { + t.Errorf("Expecting : %+v, received: %+v", tpRatingProfile.Subject, respond.Subject) + } else if !reflect.DeepEqual(len(tpRatingProfile.RatingPlanActivations), len(respond.RatingPlanActivations)) { + t.Errorf("Expecting : %+v, received: %+v", len(tpRatingProfile.RatingPlanActivations), len(respond.RatingPlanActivations)) + } +} + +func testTPRatingProfilesGetTPRatingProfileLoadIds(t *testing.T) { + var result []string + expected := []string{"RPrf"} + if err := tpRatingProfileRPC.Call("ApierV1.GetTPRatingProfileLoadIds", &utils.AttrTPRatingProfileIds{TPid: tpRatingProfile.TPid, Tenant: tpRatingProfile.Tenant, Category: tpRatingProfile.Category, Direction: tpRatingProfile.Direction, Subject: tpRatingProfile.Subject}, &result); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expected, result) { + t.Errorf("Expecting: %+v, received: %+v", expected, result) + } +} + +func testTPRatingProfilesUpdateTPRatingProfile(t *testing.T) { + var result string + tpRatingProfile.RatingPlanActivations = []*utils.TPRatingActivation{ + &utils.TPRatingActivation{ + ActivationTime: "2014-07-29T15:00:00Z", + RatingPlanId: "PlanOne", + FallbackSubjects: "FallBack", + CdrStatQueueIds: "RandomId", + }, + &utils.TPRatingActivation{ + ActivationTime: "2015-07-29T10:00:00Z", + RatingPlanId: "PlanTwo", + FallbackSubjects: "FallOut", + CdrStatQueueIds: "RandomIdTwo", + }, + &utils.TPRatingActivation{ + ActivationTime: "2017-07-29T10:00:00Z", + RatingPlanId: "BackupPlan", + FallbackSubjects: "Retreat", + CdrStatQueueIds: "DefenseID", + }, + } + if err := tpRatingProfileRPC.Call("ApierV1.SetTPRatingProfile", tpRatingProfile, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } +} + +func testTPRatingProfilesGetTPRatingProfileAfterUpdate(t *testing.T) { + var respond *utils.TPRatingProfile + if err := tpRatingProfileRPC.Call("ApierV1.GetTPRatingProfile", &AttrGetTPRatingProfile{TPid: "TPRProf1", RatingProfileId: tpRatingProfileID}, &respond); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tpRatingProfile.TPid, respond.TPid) { + t.Errorf("Expecting : %+v, received: %+v", tpRatingProfile.TPid, respond.TPid) + } else if !reflect.DeepEqual(tpRatingProfile.LoadId, respond.LoadId) { + t.Errorf("Expecting : %+v, received: %+v", tpRatingProfile.LoadId, respond.LoadId) + } else if !reflect.DeepEqual(tpRatingProfile.Direction, respond.Direction) { + t.Errorf("Expecting : %+v, received: %+v", tpRatingProfile.Direction, respond.Direction) + } else if !reflect.DeepEqual(tpRatingProfile.Tenant, respond.Tenant) { + t.Errorf("Expecting : %+v, received: %+v", tpRatingProfile.Tenant, respond.Tenant) + } else if !reflect.DeepEqual(tpRatingProfile.Category, respond.Category) { + t.Errorf("Expecting : %+v, received: %+v", tpRatingProfile.Category, respond.Category) + } else if !reflect.DeepEqual(tpRatingProfile.Subject, respond.Subject) { + t.Errorf("Expecting : %+v, received: %+v", tpRatingProfile.Subject, respond.Subject) + } else if !reflect.DeepEqual(len(tpRatingProfile.RatingPlanActivations), len(respond.RatingPlanActivations)) { + t.Errorf("Expecting : %+v, received: %+v", len(tpRatingProfile.RatingPlanActivations), len(respond.RatingPlanActivations)) + } +} + +func testTPRatingProfilesRemTPRatingProfile(t *testing.T) { + var resp string + if err := tpRatingProfileRPC.Call("ApierV1.RemTPRatingProfile", &AttrGetTPRatingProfile{TPid: "TPRProf1", RatingProfileId: tpRatingProfile.GetRatingProfilesId()}, &resp); err != nil { + t.Error(err) + } else if resp != utils.OK { + t.Error("Unexpected reply returned", resp) + } +} + +func testTPRatingProfilesGetTPRatingProfileAfterRemove(t *testing.T) { + var respond *utils.TPRatingProfile + if err := tpRatingProfileRPC.Call("ApierV1.GetTPRatingProfile", &AttrGetTPRatingProfile{TPid: "TPRProf1", RatingProfileId: tpRatingProfileID}, &respond); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } +} + +func testTPRatingProfilesKillEngine(t *testing.T) { + if err := engine.KillEngine(tpRatingProfileDelay); err != nil { + t.Error(err) + } +} diff --git a/apier/v1/tpresources_it_test.go b/apier/v1/tpresources_it_test.go index 97190a87e..7d645577d 100644 --- a/apier/v1/tpresources_it_test.go +++ b/apier/v1/tpresources_it_test.go @@ -1,4 +1,4 @@ -// +build integration +// +build offline_TP /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments @@ -89,7 +89,7 @@ func testTPResInitCfg(t *testing.T) { config.SetCgrConfig(tpResCfg) switch tpResConfigDIR { case "tutmongo": // Mongo needs more time to reset db, need to investigate - tpResDelay = 4000 + tpResDelay = 2000 default: tpResDelay = 1000 } diff --git a/apier/v1/tpsharedgroups_it_test.go b/apier/v1/tpsharedgroups_it_test.go index 0f6e311a6..ea17a923c 100644 --- a/apier/v1/tpsharedgroups_it_test.go +++ b/apier/v1/tpsharedgroups_it_test.go @@ -1,4 +1,4 @@ -// +build integration +// +build offline_TP /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments @@ -90,9 +90,9 @@ func testTPSharedGroupsInitCfg(t *testing.T) { config.SetCgrConfig(tpSharedGroupCfg) switch tpSharedGroupConfigDIR { case "tutmongo": // Mongo needs more time to reset db - tpSharedGroupDelay = 4000 - default: tpSharedGroupDelay = 2000 + default: + tpSharedGroupDelay = 1000 } } diff --git a/apier/v1/tpstats_it_test.go b/apier/v1/tpstats_it_test.go index fa8608f78..e895e3cdf 100644 --- a/apier/v1/tpstats_it_test.go +++ b/apier/v1/tpstats_it_test.go @@ -1,4 +1,4 @@ -// +build integration +// +build offline_TP /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments @@ -89,7 +89,7 @@ func testTPStatsInitCfg(t *testing.T) { config.SetCgrConfig(tpStatCfg) switch tpStatConfigDIR { case "tutmongo": // Mongo needs more time to reset db - tpStatDelay = 4000 + tpStatDelay = 2000 default: tpStatDelay = 1000 } diff --git a/apier/v1/tptimings_it_test.go b/apier/v1/tptimings_it_test.go index c98604151..e58df05e0 100644 --- a/apier/v1/tptimings_it_test.go +++ b/apier/v1/tptimings_it_test.go @@ -1,4 +1,4 @@ -// +build integration +// +build offline_TP /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments @@ -90,9 +90,9 @@ func testTPTimingsInitCfg(t *testing.T) { config.SetCgrConfig(tpTimingCfg) switch tpTimingConfigDIR { case "tutmongo": // Mongo needs more time to reset db - tpTimingDelay = 4000 - default: tpTimingDelay = 2000 + default: + tpTimingDelay = 1000 } } diff --git a/apier/v1/tpusers_it_test.go b/apier/v1/tpusers_it_test.go index 27a1c891d..1e797e8e0 100644 --- a/apier/v1/tpusers_it_test.go +++ b/apier/v1/tpusers_it_test.go @@ -1,4 +1,4 @@ -// +build integration +// +build offline_TP /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments @@ -90,9 +90,9 @@ func testTPUsersInitCfg(t *testing.T) { config.SetCgrConfig(tpUserCfg) switch tpUserConfigDIR { case "tutmongo": // Mongo needs more time to reset db - tpUserDelay = 4000 - default: tpUserDelay = 2000 + default: + tpUserDelay = 1000 } } diff --git a/engine/datamanager.go b/engine/datamanager.go index d38cb417e..c7056143f 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -30,6 +30,11 @@ type DataManager struct { dataDB DataDB } +// DataDB exports access to dataDB +func (dm *DataManager) DataDB() DataDB { + return dm.dataDB +} + // GetStatQueue retrieves a StatQueue from dataDB // handles caching and deserialization of metrics func (dm *DataManager) GetStatQueue(tenant, id string, skipCache bool, transactionID string) (sq *StatQueue, err error) { diff --git a/engine/datamanager_it_test.go b/engine/datamanager_it_test.go index a0d68ae72..2a7c3d0e3 100644 --- a/engine/datamanager_it_test.go +++ b/engine/datamanager_it_test.go @@ -99,7 +99,7 @@ func testDMitCRUDStatQueue(t *testing.T) { }, }, } - cacheKey := utils.StatQueuePrefix + sq.SqID() + cacheKey := utils.StatQueuePrefix + sq.TenantID() if _, rcvErr := dm.GetStatQueue(sq.Tenant, sq.ID, false, ""); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } diff --git a/engine/libstats.go b/engine/libstats.go index 0a6c53189..8f9b6b5e6 100755 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -151,7 +151,7 @@ type StatQueue struct { } // SqID will compose the unique identifier for the StatQueue out of Tenant and ID -func (sq *StatQueue) SqID() string { +func (sq *StatQueue) TenantID() string { return utils.ConcatenatedKey(sq.Tenant, sq.ID) } diff --git a/engine/resources.go b/engine/resources.go index fceea912e..357f1b60b 100755 --- a/engine/resources.go +++ b/engine/resources.go @@ -292,7 +292,7 @@ func (rS *ResourceService) storeResources() { break // no more keys, backup completed } if rIf, ok := cache.Get(utils.ResourcesPrefix + rID); !ok || rIf == nil { - utils.Logger.Warning(fmt.Sprintf(" failed retrieving from cache resource with ID: %s")) + utils.Logger.Warning(fmt.Sprintf(" failed retrieving from cache resource with ID: %s", rID)) } else if err := rS.StoreResource(rIf.(*Resource)); err != nil { failedRIDs = append(failedRIDs, rID) // record failure so we can schedule it for next backup } diff --git a/engine/stats.go b/engine/stats.go index 18b775b26..1221968b9 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -19,83 +19,139 @@ along with this program. If not, see package engine import ( + "fmt" "math/rand" + "sync" "time" + + "github.com/cgrates/cgrates/cache" + "github.com/cgrates/cgrates/utils" ) -func init() { - rand.Seed(time.Now().UnixNano()) -} - -/* // NewStatService initializes a StatService -func NewStatService(dataDB DataDB, ms Marshaler, storeInterval time.Duration) (ss *StatService, err error) { - ss = &StatService{dataDB: dataDB, ms: ms, storeInterval: storeInterval, - stopStoring: make(chan struct{})} - sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatsConfigPrefix) - if err != nil { - return nil, err - } - go ss.dumpStoredMetrics() // start dumpStoredMetrics loop - return +func NewStatService(dm *DataManager, storeInterval time.Duration) (ss *StatService, err error) { + return &StatService{dm: dm, storeInterval: storeInterval, + stopBackup: make(chan struct{})}, nil } // StatService builds stats for events type StatService struct { - dataDB DataDB - ms Marshaler - storeInterval time.Duration - stopStoring chan struct{} + dm *DataManager + storeInterval time.Duration + stopBackup chan struct{} + storedStatQueues utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool + ssqMux sync.RWMutex // protects storedStatQueues } // ListenAndServe loops keeps the service alive -func (ss *StatService) ListenAndServe(exitChan chan bool) error { +func (sS *StatService) ListenAndServe(exitChan chan bool) error { + go sS.runBackup() // start backup loop e := <-exitChan exitChan <- e // put back for the others listening for shutdown request return nil } -// Called to shutdown the service -// ToDo: improve with context, ie following http implementation -func (ss *StatService) Shutdown() error { +// Shutdown is called to shutdown the service +func (sS *StatService) Shutdown() error { utils.Logger.Info(" service shutdown initialized") - close(ss.stopStoring) - ss.storeMetrics() + close(sS.stopBackup) + sS.storeStats() utils.Logger.Info(" service shutdown complete") return nil } +// runBackup will regularly store resources changed to dataDB +func (sS *StatService) runBackup() { + if sS.storeInterval <= 0 { + return + } + for { + select { + case <-sS.stopBackup: + return + } + sS.storeStats() + } + time.Sleep(sS.storeInterval) +} + +// storeResources represents one task of complete backup +func (sS *StatService) storeStats() { + var failedSqIDs []string + for { // don't stop untill we store all dirty statQueues + sS.ssqMux.Lock() + sID := sS.storedStatQueues.GetOne() + if sID != "" { + delete(sS.storedStatQueues, sID) + } + sS.ssqMux.Unlock() + if sID == "" { + break // no more keys, backup completed + } + if sqIf, ok := cache.Get(utils.StatQueuePrefix + sID); !ok || sqIf == nil { + utils.Logger.Warning(fmt.Sprintf(" failed retrieving from cache stat queue with ID: %s", sID)) + } else if err := sS.StoreStatQueue(sqIf.(*StatQueue)); err != nil { + failedSqIDs = append(failedSqIDs, sID) // record failure so we can schedule it for next backup + } + // randomize the CPU load and give up thread control + time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) + } + if len(failedSqIDs) != 0 { // there were errors on save, schedule the keys for next backup + sS.ssqMux.Lock() + for _, sqID := range failedSqIDs { + sS.storedStatQueues[sqID] = true + } + sS.ssqMux.Unlock() + } +} + +// StoreStatQueue stores the statQueue in DB and corrects dirty flag +func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) { + if sq.dirty == nil || !*sq.dirty { + return + } + if err = sS.dm.SetStatQueue(sq); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" failed saving StatQueue with ID: %s, error: %s", + sq.TenantID(), err.Error())) + } else { + *sq.dirty = false + } + return +} + +/* // setQueue adds or modifies a queue into cache -// sort will reorder the ss.queues +// sort will reorder the sS.queues func (ss *StatService) loadQueue(qID string) (q *StatQueue, err error) { - sq, err := ss.dataDB.GetStatsConfig(qID) + sq, err := sS.dataDB.GetStatsConfig(qID) if err != nil { return nil, err } - return NewStatQueue(ss.evCache, ss.ms, sq, sqSM) + return NewStatQueue(sS.evCache, sS.ms, sq, sqSM) } func (ss *StatService) setQueue(q *StatQueue) { - ss.queuesCache[q.cfg.ID] = q - ss.queues = append(ss.queues, q) + sS.queuesCache[q.cfg.ID] = q + sS.queues = append(sS.queues, q) } // remQueue will remove a queue based on it's ID func (ss *StatService) remQueue(qID string) (si *StatQueue) { - si = ss.queuesCache[qID] - ss.queues.remWithID(qID) - delete(ss.queuesCache, qID) + si = sS.queuesCache[qID] + sS.queues.remWithID(qID) + delete(sS.queuesCache, qID) return } // store stores the necessary storedMetrics to dataDB func (ss *StatService) storeMetrics() { - for _, si := range ss.queues { + for _, si := range sS.queues { if !si.cfg.Store || !si.dirty { // no need to save continue } if siSM := si.GetStoredMetrics(); siSM != nil { - if err := ss.dataDB.SetSQStoredMetrics(siSM); err != nil { + if err := sS.dataDB.SetSQStoredMetrics(siSM); err != nil { utils.Logger.Warning( fmt.Sprintf(" failed saving StoredMetrics for QueueID: %s, error: %s", si.cfg.ID, err.Error())) @@ -111,11 +167,11 @@ func (ss *StatService) storeMetrics() { func (ss *StatService) dumpStoredMetrics() { for { select { - case <-ss.stopStoring: + case <-sS.stopStoring: return } - ss.storeMetrics() - time.Sleep(ss.storeInterval) + sS.storeMetrics() + time.Sleep(sS.storeInterval) } } @@ -125,7 +181,7 @@ func (ss *StatService) processEvent(ev StatsEvent) (err error) { if evStatsID == "" { // ID is mandatory return errors.New("missing ID field") } - for _, stInst := range ss.queues { + for _, stInst := range sS.queues { if err := stInst.ProcessEvent(ev); err != nil { utils.Logger.Warning( fmt.Sprintf(" QueueID: %s, ignoring event with ID: %s, error: %s", @@ -140,7 +196,7 @@ func (ss *StatService) processEvent(ev StatsEvent) (err error) { // V1ProcessEvent implements StatV1 method for processing an Event func (ss *StatService) V1ProcessEvent(ev StatsEvent, reply *string) (err error) { - if err = ss.processEvent(ev); err == nil { + if err = sS.processEvent(ev); err == nil { *reply = utils.OK } return @@ -148,10 +204,10 @@ func (ss *StatService) V1ProcessEvent(ev StatsEvent, reply *string) (err error) // V1GetQueueIDs returns list of queue IDs configured in the service func (ss *StatService) V1GetQueueIDs(ignored struct{}, reply *[]string) (err error) { - if len(ss.queuesCache) == 0 { + if len(sS.queuesCache) == 0 { return utils.ErrNotFound } - for k := range ss.queuesCache { + for k := range sS.queuesCache { *reply = append(*reply, k) } return @@ -159,7 +215,7 @@ func (ss *StatService) V1GetQueueIDs(ignored struct{}, reply *[]string) (err err // V1GetStringMetrics returns the metrics as string values func (ss *StatService) V1GetStringMetrics(queueID string, reply *map[string]string) (err error) { - sq, has := ss.queuesCache[queueID] + sq, has := sS.queuesCache[queueID] if !has { return utils.ErrNotFound } @@ -173,7 +229,7 @@ func (ss *StatService) V1GetStringMetrics(queueID string, reply *map[string]stri // V1GetFloatMetrics returns the metrics as float64 values func (ss *StatService) V1GetFloatMetrics(queueID string, reply *map[string]float64) (err error) { - sq, has := ss.queuesCache[queueID] + sq, has := sS.queuesCache[queueID] if !has { return utils.ErrNotFound } @@ -195,7 +251,7 @@ type ArgsLoadQueues struct { func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err error) { qIDs := args.QueueIDs if qIDs == nil { - sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsConfigPrefix) + sqPrfxs, err := sS.dataDB.GetKeysForPrefix(utils.StatsConfigPrefix) if err != nil { return err } @@ -212,10 +268,10 @@ func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err err } var sQs []*StatQueue // cache here so we lock only later when data available for _, qID := range *qIDs { - if _, hasPrev := ss.queuesCache[qID]; hasPrev { + if _, hasPrev := sS.queuesCache[qID]; hasPrev { continue // don't overwrite previous, could be extended in the future by carefully checking cached events } - if q, err := ss.loadQueue(qID); err != nil { + if q, err := sS.loadQueue(qID); err != nil { utils.Logger.Err(fmt.Sprintf(" failed loading quueue with id: <%s>, err: <%s>", q.cfg.ID, err.Error())) continue @@ -223,12 +279,12 @@ func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err err sQs = append(sQs, q) } } - ss.Lock() + sS.Lock() for _, q := range sQs { - ss.setQueue(q) + sS.setQueue(q) } - ss.queues.Sort() - ss.Unlock() + sS.queues.Sort() + sS.Unlock() *reply = utils.OK return } diff --git a/offline_tp_test.sh b/offline_tp_test.sh new file mode 100755 index 000000000..c2c7cc216 --- /dev/null +++ b/offline_tp_test.sh @@ -0,0 +1,6 @@ +./test.sh +gen=$? +echo 'go test github.com/cgrates/cgrates/apier/v1 -tags=offline_TP' +go test github.com/cgrates/cgrates/apier/v1 -tags=offline_TP + +exit $gen