diff --git a/apier/v1/apier.go b/apier/v1/apier.go index c37400392..d7907d987 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -335,6 +335,11 @@ func (apierSv1 *APIerSv1) LoadRatingProfile(ctx *context.Context, attrs *utils.T if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheRatingProfiles: time.Now().UnixNano()}); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache reload + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache reload for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) if err = dbReader.ReloadCache(config.CgrConfig().GeneralCfg().DefaultCaching, true, make(map[string]any), attrs.Tenant); err != nil { return utils.NewErrServerError(err) } @@ -407,6 +412,11 @@ func (apierSv1 *APIerSv1) LoadTariffPlanFromStorDb(ctx *context.Context, attrs * if attrs.Caching != nil { caching = *attrs.Caching } + // delay if needed before cache reload + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache reload for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) // reload cache utils.Logger.Info("APIerSv1.LoadTariffPlanFromStorDb, reloading cache.") if err := dbReader.ReloadCache(caching, true, attrs.APIOpts, apierSv1.Config.GeneralCfg().DefaultTenant); err != nil { @@ -506,6 +516,11 @@ func (apierSv1 *APIerSv1) SetRatingProfile(ctx *context.Context, attrs *utils.At if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheRatingProfiles: time.Now().UnixNano()}); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) if err := apierSv1.CallCache(utils.IfaceAsString(attrs.APIOpts[utils.CacheOpt]), attrs.Tenant, utils.CacheRatingProfiles, keyID, utils.EmptyString, nil, nil, attrs.APIOpts); err != nil { return utils.APIErrorHandler(err) } @@ -1089,6 +1104,11 @@ func (apierSv1 *APIerSv1) LoadTariffPlanFromFolder(ctx *context.Context, attrs * if attrs.Caching != nil { caching = *attrs.Caching } + // delay if needed before cache reload + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache reload for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) // reload cache utils.Logger.Info("APIerSv1.LoadTariffPlanFromFolder, reloading cache.") if err := loader.ReloadCache(caching, true, attrs.APIOpts, apierSv1.Config.GeneralCfg().DefaultTenant); err != nil { @@ -1161,6 +1181,11 @@ func (apierSv1 *APIerSv1) RemoveTPFromFolder(ctx *context.Context, attrs *utils. if attrs.Caching != nil { caching = *attrs.Caching } + // delay if needed before cache reload + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache reload for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) // reload cache utils.Logger.Info("APIerSv1.RemoveTPFromFolder, reloading cache.") if err := loader.ReloadCache(caching, true, attrs.APIOpts, apierSv1.Config.GeneralCfg().DefaultTenant); err != nil { @@ -1213,6 +1238,11 @@ func (apierSv1 *APIerSv1) RemoveTPFromStorDB(ctx *context.Context, attrs *AttrLo if attrs.Caching != nil { caching = *attrs.Caching } + // delay if needed before cache reload + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache reload for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) // reload cache utils.Logger.Info("APIerSv1.RemoveTPFromStorDB, reloading cache.") if err := dbReader.ReloadCache(caching, true, attrs.APIOpts, apierSv1.Config.GeneralCfg().DefaultTenant); err != nil { @@ -1276,6 +1306,11 @@ func (apierSv1 *APIerSv1) RemoveRatingProfile(ctx *context.Context, attr *AttrRe if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheRatingProfiles: time.Now().UnixNano()}); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) if err := apierSv1.CallCache(utils.IfaceAsString(attr.APIOpts[utils.CacheOpt]), attr.Tenant, utils.CacheRatingProfiles, keyID, utils.EmptyString, nil, nil, attr.APIOpts); err != nil { return utils.APIErrorHandler(err) } diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go index ba1ea7aac..db8c7a36a 100644 --- a/apier/v1/apier_it_test.go +++ b/apier/v1/apier_it_test.go @@ -826,11 +826,17 @@ func testApierLoadRatingProfile(t *testing.T) { rpf := &utils.TPRatingProfile{ TPid: utils.TestSQL, LoadId: utils.TestSQL, Tenant: "cgrates.org", Category: "call", Subject: "*any"} + startTime := time.Now() if err := rater.Call(context.Background(), utils.APIerSv1LoadRatingProfile, &rpf, &reply); err != nil { t.Error("Got error on APIerSv1.LoadRatingProfile: ", err.Error()) } else if reply != utils.OK { t.Error("Calling APIerSv1.LoadRatingProfile got reply: ", reply) } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } } func testApierLoadRatingProfileWithoutTenant(t *testing.T) { @@ -893,11 +899,17 @@ func testApierSetRatingProfile(t *testing.T) { rpa := &utils.TPRatingActivation{ActivationTime: "2012-01-01T00:00:00Z", RatingPlanId: "RETAIL1", FallbackSubjects: "dan2"} rpf := &utils.AttrSetRatingProfile{Tenant: "cgrates.org", Category: "call", Subject: "dan", RatingPlanActivations: []*utils.TPRatingActivation{rpa}} + startTime := time.Now() if err := rater.Call(context.Background(), utils.APIerSv1SetRatingProfile, &rpf, &reply); err != nil { t.Error("Got error on APIerSv1.SetRatingProfile: ", err.Error()) } else if reply != utils.OK { t.Error("Calling APIerSv1.SetRatingProfile got reply: ", reply) } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } var rcvStats map[string]*ltcache.CacheStats expectedStats := engine.GetDefaultEmptyCacheStats() // expectedStats[utils.CacheAccountActionPlans].Items = 1 // was removed because the accountActionPlans are only set in DB not in Cache @@ -1622,11 +1634,17 @@ func testApierLoadTariffPlanFromFolder(t *testing.T) { } // Simple test that command is executed without errors attrs = &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "testtp")} + startTime := time.Now() if err := rater.Call(context.Background(), utils.APIerSv1LoadTariffPlanFromFolder, attrs, &reply); err != nil { t.Error("Got error on APIerSv1.LoadTariffPlanFromFolder: ", err.Error()) } else if reply != utils.OK { t.Error("Calling APIerSv1.LoadTariffPlanFromFolder got reply: ", reply) } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } time.Sleep(100 * time.Millisecond) } @@ -1717,11 +1735,17 @@ func testApierSetChargerS(t *testing.T) { }, } var result string + startTime := time.Now() if err := rater.Call(context.Background(), utils.APIerSv1SetChargerProfile, chargerProfile, &result); err != nil { t.Error(err) } else if result != utils.OK { t.Error("Unexpected reply returned", result) } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } } // Make sure balance was topped-up @@ -2041,12 +2065,18 @@ func testApierGetCacheStats2(t *testing.T) { func testApierLoadTariffPlanFromStorDb(t *testing.T) { var reply string + startTime := time.Now() if err := rater.Call(context.Background(), utils.APIerSv1LoadTariffPlanFromStorDb, &AttrLoadTpFromStorDb{TPid: "TEST_TPID2"}, &reply); err != nil { t.Error("Got error on APIerSv1.LoadTariffPlanFromStorDb: ", err.Error()) } else if reply != utils.OK { t.Error("Calling APIerSv1.LoadTariffPlanFromStorDb got reply: ", reply) } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } } func testApierStartStopServiceStatus(t *testing.T) { @@ -2114,6 +2144,7 @@ func testApierSetRatingProfileWithoutTenant(t *testing.T) { func testApierRemoveRatingProfilesWithoutTenant(t *testing.T) { var reply string + startTime := time.Now() if err := rater.Call(context.Background(), utils.APIerSv1RemoveRatingProfile, &AttrRemoveRatingProfile{ Category: utils.Call, Subject: "dan3", @@ -2122,6 +2153,11 @@ func testApierRemoveRatingProfilesWithoutTenant(t *testing.T) { } else if reply != utils.OK { t.Errorf("Expected: %s, received: %s ", utils.OK, reply) } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } var result *engine.RatingProfile if err := rater.Call(context.Background(), utils.APIerSv1GetRatingProfile, &utils.AttrGetRatingProfile{Category: utils.Call, Subject: "dan3"}, diff --git a/apier/v1/attributes.go b/apier/v1/attributes.go index f7927c06c..b3b5e6c2f 100644 --- a/apier/v1/attributes.go +++ b/apier/v1/attributes.go @@ -19,6 +19,7 @@ along with this program. If not, see package v1 import ( + "fmt" "time" "github.com/cgrates/birpc/context" @@ -115,7 +116,11 @@ func (apierSv1 *APIerSv1) SetAttributeProfile(ctx *context.Context, alsWrp *engi if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheAttributeProfiles: time.Now().UnixNano()}); err != nil { return utils.APIErrorHandler(err) } - + // delay if needed before cache call + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) if err := apierSv1.CallCache(utils.IfaceAsString(alsWrp.APIOpts[utils.CacheOpt]), alsWrp.Tenant, utils.CacheAttributeProfiles, alsWrp.TenantID(), utils.EmptyString, &alsWrp.FilterIDs, alsWrp.Contexts, alsWrp.APIOpts); err != nil { return utils.APIErrorHandler(err) @@ -140,6 +145,11 @@ func (apierSv1 *APIerSv1) RemoveAttributeProfile(ctx *context.Context, arg *util if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheAttributeProfiles: time.Now().UnixNano()}); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) if err := apierSv1.CallCache(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), tnt, utils.CacheAttributeProfiles, utils.ConcatenatedKey(tnt, arg.ID), utils.EmptyString, nil, nil, arg.APIOpts); err != nil { return utils.APIErrorHandler(err) diff --git a/apier/v1/chargers.go b/apier/v1/chargers.go index d23c4e7fa..de9dd1174 100644 --- a/apier/v1/chargers.go +++ b/apier/v1/chargers.go @@ -19,6 +19,7 @@ along with this program. If not, see package v1 import ( + "fmt" "time" "github.com/cgrates/birpc/context" @@ -85,6 +86,11 @@ func (apierSv1 *APIerSv1) SetChargerProfile(ctx *context.Context, arg *ChargerWi if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheChargerProfiles: time.Now().UnixNano()}); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) //handle caching for ChargerProfile if err := apierSv1.CallCache(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), arg.Tenant, utils.CacheChargerProfiles, arg.TenantID(), utils.EmptyString, &arg.FilterIDs, nil, arg.APIOpts); err != nil { @@ -111,6 +117,11 @@ func (apierSv1 *APIerSv1) RemoveChargerProfile(ctx *context.Context, arg *utils. if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheChargerProfiles: time.Now().UnixNano()}); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) //handle caching for ChargerProfile if err := apierSv1.CallCache(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), tnt, utils.CacheChargerProfiles, utils.ConcatenatedKey(tnt, arg.ID), utils.EmptyString, nil, nil, arg.APIOpts); err != nil { diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go index 24e6d5fe7..9adcf6b9e 100644 --- a/apier/v1/dispatcher.go +++ b/apier/v1/dispatcher.go @@ -19,6 +19,7 @@ along with this program. If not, see package v1 import ( + "fmt" "time" "github.com/cgrates/birpc/context" @@ -89,6 +90,11 @@ func (apierSv1 *APIerSv1) SetDispatcherProfile(ctx *context.Context, args *Dispa if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheDispatcherProfiles: time.Now().UnixNano()}); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) //handle caching for DispatcherProfile if err := apierSv1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), args.Tenant, utils.CacheDispatcherProfiles, args.TenantID(), utils.EmptyString, &args.FilterIDs, args.Subsystems, args.APIOpts); err != nil { @@ -127,6 +133,11 @@ func (apierSv1 *APIerSv1) RemoveDispatcherProfile(ctx *context.Context, arg *uti if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheDispatcherProfiles: time.Now().UnixNano()}); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) //handle caching for DispatcherProfile if err := apierSv1.CallCache(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), tnt, utils.CacheDispatcherProfiles, utils.ConcatenatedKey(tnt, arg.ID), utils.EmptyString, nil, nil, arg.APIOpts); err != nil { diff --git a/apier/v1/replicator.go b/apier/v1/replicator.go index 19916448c..925753701 100644 --- a/apier/v1/replicator.go +++ b/apier/v1/replicator.go @@ -19,6 +19,9 @@ along with this program. If not, see package v1 import ( + "fmt" + "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -374,6 +377,11 @@ func (rplSv1 *ReplicatorSv1) SetThresholdProfile(ctx *context.Context, th *engin if err = rplSv1.dm.DataDB().SetThresholdProfileDrv(th.ThresholdProfile); err != nil { return } + // delay if needed before cache call + if rplSv1.v1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", rplSv1.v1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(rplSv1.v1.Config.GeneralCfg().CachingDelay) if err = rplSv1.v1.CallCache(utils.IfaceAsString(th.APIOpts[utils.CacheOpt]), th.Tenant, utils.CacheThresholdProfiles, th.TenantID(), utils.EmptyString, &th.FilterIDs, nil, th.APIOpts); err != nil { return @@ -400,6 +408,11 @@ func (rplSv1 *ReplicatorSv1) SetStatQueueProfile(ctx *context.Context, sq *engin if err = rplSv1.dm.DataDB().SetStatQueueProfileDrv(sq.StatQueueProfile); err != nil { return } + // delay if needed before cache call + if rplSv1.v1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", rplSv1.v1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(rplSv1.v1.Config.GeneralCfg().CachingDelay) if err = rplSv1.v1.CallCache(utils.IfaceAsString(sq.APIOpts[utils.CacheOpt]), sq.Tenant, utils.CacheStatQueueProfiles, sq.TenantID(), utils.EmptyString, &sq.FilterIDs, nil, sq.APIOpts); err != nil { return @@ -452,6 +465,11 @@ func (rplSv1 *ReplicatorSv1) SetResourceProfile(ctx *context.Context, rs *engine if err = rplSv1.dm.DataDB().SetResourceProfileDrv(rs.ResourceProfile); err != nil { return } + // delay if needed before cache call + if rplSv1.v1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", rplSv1.v1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(rplSv1.v1.Config.GeneralCfg().CachingDelay) if err = rplSv1.v1.CallCache(utils.IfaceAsString(rs.APIOpts[utils.CacheOpt]), rs.Tenant, utils.CacheResourceProfiles, rs.TenantID(), utils.EmptyString, &rs.FilterIDs, nil, rs.APIOpts); err != nil { return @@ -530,6 +548,11 @@ func (rplSv1 *ReplicatorSv1) SetRatingProfile(ctx *context.Context, rp *engine.R if err = rplSv1.dm.DataDB().SetRatingProfileDrv(rp.RatingProfile); err != nil { return } + // delay if needed before cache call + if rplSv1.v1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", rplSv1.v1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(rplSv1.v1.Config.GeneralCfg().CachingDelay) if err = rplSv1.v1.CallCache(utils.IfaceAsString(rp.APIOpts[utils.CacheOpt]), rp.Tenant, utils.CacheRatingProfiles, rp.Id, utils.EmptyString, nil, nil, rp.APIOpts); err != nil { return @@ -543,6 +566,11 @@ func (rplSv1 *ReplicatorSv1) SetRouteProfile(ctx *context.Context, sp *engine.Ro if err = rplSv1.dm.DataDB().SetRouteProfileDrv(sp.RouteProfile); err != nil { return } + // delay if needed before cache call + if rplSv1.v1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", rplSv1.v1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(rplSv1.v1.Config.GeneralCfg().CachingDelay) if err = rplSv1.v1.CallCache(utils.IfaceAsString(sp.APIOpts[utils.CacheOpt]), sp.Tenant, utils.CacheRouteProfiles, sp.TenantID(), utils.EmptyString, &sp.FilterIDs, nil, sp.APIOpts); err != nil { return @@ -556,6 +584,11 @@ func (rplSv1 *ReplicatorSv1) SetAttributeProfile(ctx *context.Context, ap *engin if err = rplSv1.dm.DataDB().SetAttributeProfileDrv(ap.AttributeProfile); err != nil { return } + // delay if needed before cache call + if rplSv1.v1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", rplSv1.v1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(rplSv1.v1.Config.GeneralCfg().CachingDelay) if err = rplSv1.v1.CallCache(utils.IfaceAsString(ap.APIOpts[utils.CacheOpt]), ap.Tenant, utils.CacheAttributeProfiles, ap.TenantID(), utils.EmptyString, &ap.FilterIDs, ap.Contexts, ap.APIOpts); err != nil { return @@ -569,6 +602,11 @@ func (rplSv1 *ReplicatorSv1) SetChargerProfile(ctx *context.Context, cp *engine. if err = rplSv1.dm.DataDB().SetChargerProfileDrv(cp.ChargerProfile); err != nil { return } + // delay if needed before cache call + if rplSv1.v1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", rplSv1.v1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(rplSv1.v1.Config.GeneralCfg().CachingDelay) if err = rplSv1.v1.CallCache(utils.IfaceAsString(cp.APIOpts[utils.CacheOpt]), cp.Tenant, utils.CacheChargerProfiles, cp.TenantID(), utils.EmptyString, &cp.FilterIDs, nil, cp.APIOpts); err != nil { return @@ -582,6 +620,11 @@ func (rplSv1 *ReplicatorSv1) SetDispatcherProfile(ctx *context.Context, dpp *eng if err = rplSv1.dm.DataDB().SetDispatcherProfileDrv(dpp.DispatcherProfile); err != nil { return } + // delay if needed before cache call + if rplSv1.v1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", rplSv1.v1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(rplSv1.v1.Config.GeneralCfg().CachingDelay) if err = rplSv1.v1.CallCache(utils.IfaceAsString(dpp.APIOpts[utils.CacheOpt]), dpp.Tenant, utils.CacheDispatcherProfiles, dpp.TenantID(), utils.EmptyString, &dpp.FilterIDs, dpp.Subsystems, dpp.APIOpts); err != nil { return @@ -730,6 +773,11 @@ func (rplSv1 *ReplicatorSv1) RemoveThresholdProfile(ctx *context.Context, args * if err = rplSv1.dm.DataDB().RemThresholdProfileDrv(args.Tenant, args.ID); err != nil { return } + // delay if needed before cache call + if rplSv1.v1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", rplSv1.v1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(rplSv1.v1.Config.GeneralCfg().CachingDelay) if err = rplSv1.v1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), args.Tenant, utils.CacheThresholdProfiles, args.TenantID.TenantID(), utils.EmptyString, nil, nil, args.APIOpts); err != nil { return @@ -743,6 +791,11 @@ func (rplSv1 *ReplicatorSv1) RemoveStatQueueProfile(ctx *context.Context, args * if err = rplSv1.dm.DataDB().RemStatQueueProfileDrv(args.Tenant, args.ID); err != nil { return } + // delay if needed before cache call + if rplSv1.v1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", rplSv1.v1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(rplSv1.v1.Config.GeneralCfg().CachingDelay) if err = rplSv1.v1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), args.Tenant, utils.CacheStatQueueProfiles, args.TenantID.TenantID(), utils.EmptyString, nil, nil, args.APIOpts); err != nil { return @@ -782,6 +835,11 @@ func (rplSv1 *ReplicatorSv1) RemoveResourceProfile(ctx *context.Context, args *u if err = rplSv1.dm.DataDB().RemoveResourceProfileDrv(args.Tenant, args.ID); err != nil { return } + // delay if needed before cache call + if rplSv1.v1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", rplSv1.v1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(rplSv1.v1.Config.GeneralCfg().CachingDelay) if err = rplSv1.v1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), args.Tenant, utils.CacheResourceProfiles, args.TenantID.TenantID(), utils.EmptyString, nil, nil, args.APIOpts); err != nil { return @@ -873,6 +931,11 @@ func (rplSv1 *ReplicatorSv1) RemoveRatingProfile(ctx *context.Context, id *utils if err = rplSv1.dm.DataDB().RemoveRatingProfileDrv(id.Arg); err != nil { return } + // delay if needed before cache call + if rplSv1.v1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", rplSv1.v1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(rplSv1.v1.Config.GeneralCfg().CachingDelay) if err = rplSv1.v1.CallCache(utils.IfaceAsString(id.APIOpts[utils.CacheOpt]), id.Tenant, utils.CacheRatingProfiles, id.Arg, utils.EmptyString, nil, nil, id.APIOpts); err != nil { return @@ -886,6 +949,11 @@ func (rplSv1 *ReplicatorSv1) RemoveRouteProfile(ctx *context.Context, args *util if err = rplSv1.dm.DataDB().RemoveRouteProfileDrv(args.Tenant, args.ID); err != nil { return } + // delay if needed before cache call + if rplSv1.v1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", rplSv1.v1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(rplSv1.v1.Config.GeneralCfg().CachingDelay) if err = rplSv1.v1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), args.Tenant, utils.CacheRouteProfiles, args.TenantID.TenantID(), utils.EmptyString, nil, nil, args.APIOpts); err != nil { return diff --git a/apier/v1/resourcesv1.go b/apier/v1/resourcesv1.go index 2cc0722d4..9b0ee3bfe 100644 --- a/apier/v1/resourcesv1.go +++ b/apier/v1/resourcesv1.go @@ -19,6 +19,7 @@ along with this program. If not, see package v1 import ( + "fmt" "time" "github.com/cgrates/birpc/context" @@ -127,6 +128,11 @@ func (apierSv1 *APIerSv1) SetResourceProfile(ctx *context.Context, arg *engine.R utils.CacheResources: loadID}); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) //handle caching for ResourceProfile if err = apierSv1.CallCache(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), arg.Tenant, utils.CacheResourceProfiles, arg.TenantID(), utils.EmptyString, &arg.FilterIDs, nil, arg.APIOpts); err != nil { @@ -148,6 +154,11 @@ func (apierSv1 *APIerSv1) RemoveResourceProfile(ctx *context.Context, arg *utils if err := apierSv1.DataManager.RemoveResourceProfile(tnt, arg.ID, true); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) //handle caching for ResourceProfile if err := apierSv1.CallCache(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), tnt, utils.CacheResourceProfiles, utils.ConcatenatedKey(tnt, arg.ID), utils.EmptyString, nil, nil, arg.APIOpts); err != nil { diff --git a/apier/v1/routes.go b/apier/v1/routes.go index ab410c91c..b6da13d11 100644 --- a/apier/v1/routes.go +++ b/apier/v1/routes.go @@ -19,6 +19,7 @@ along with this program. If not, see package v1 import ( + "fmt" "time" "github.com/cgrates/birpc/context" @@ -85,6 +86,11 @@ func (apierSv1 *APIerSv1) SetRouteProfile(ctx *context.Context, args *RouteWithA if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheRouteProfiles: time.Now().UnixNano()}); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) //handle caching for SupplierProfile if err := apierSv1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), args.Tenant, utils.CacheRouteProfiles, args.TenantID(), utils.EmptyString, &args.FilterIDs, nil, args.APIOpts); err != nil { @@ -110,6 +116,11 @@ func (apierSv1 *APIerSv1) RemoveRouteProfile(ctx *context.Context, args *utils.T if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheRouteProfiles: time.Now().UnixNano()}); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) //handle caching for SupplierProfile if err := apierSv1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), tnt, utils.CacheRouteProfiles, utils.ConcatenatedKey(tnt, args.ID), utils.EmptyString, nil, nil, args.APIOpts); err != nil { diff --git a/apier/v1/stats.go b/apier/v1/stats.go index 33c5be393..0d6d84f83 100644 --- a/apier/v1/stats.go +++ b/apier/v1/stats.go @@ -19,6 +19,7 @@ along with this program. If not, see package v1 import ( + "fmt" "time" "github.com/cgrates/birpc/context" @@ -83,6 +84,11 @@ func (apierSv1 *APIerSv1) SetStatQueueProfile(ctx *context.Context, arg *engine. if err = apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheStatQueueProfiles: loadID, utils.CacheStatQueues: loadID}); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) //handle caching for StatQueueProfile if err = apierSv1.CallCache(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), arg.Tenant, utils.CacheStatQueueProfiles, arg.TenantID(), utils.EmptyString, &arg.FilterIDs, nil, arg.APIOpts); err != nil { @@ -104,6 +110,11 @@ func (apierSv1 *APIerSv1) RemoveStatQueueProfile(ctx *context.Context, args *uti if err := apierSv1.DataManager.RemoveStatQueueProfile(tnt, args.ID, true); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) //handle caching for StatQueueProfile if err := apierSv1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), tnt, utils.CacheStatQueueProfiles, utils.ConcatenatedKey(tnt, args.ID), utils.EmptyString, nil, nil, args.APIOpts); err != nil { diff --git a/apier/v1/thresholds.go b/apier/v1/thresholds.go index e9b63e180..d15b3edd3 100644 --- a/apier/v1/thresholds.go +++ b/apier/v1/thresholds.go @@ -19,6 +19,7 @@ along with this program. If not, see package v1 import ( + "fmt" "time" "github.com/cgrates/birpc/context" @@ -141,6 +142,11 @@ func (apierSv1 *APIerSv1) SetThresholdProfile(ctx *context.Context, args *engine if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheThresholdProfiles: loadID, utils.CacheThresholds: loadID}); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) //handle caching for ThresholdProfile and Threshold if err := apierSv1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), args.Tenant, utils.CacheThresholdProfiles, args.TenantID(), utils.EmptyString, &args.FilterIDs, nil, args.APIOpts); err != nil { @@ -162,6 +168,11 @@ func (apierSv1 *APIerSv1) RemoveThresholdProfile(ctx *context.Context, args *uti if err := apierSv1.DataManager.RemoveThresholdProfile(tnt, args.ID, true); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if apierSv1.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay) //handle caching for ThresholdProfile if err := apierSv1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), tnt, utils.CacheThresholdProfiles, utils.ConcatenatedKey(tnt, args.ID), utils.EmptyString, nil, nil, args.APIOpts); err != nil { diff --git a/apier/v2/apier.go b/apier/v2/apier.go index 99255ef56..4bf9000f1 100644 --- a/apier/v2/apier.go +++ b/apier/v2/apier.go @@ -68,6 +68,11 @@ func (apiv2 *APIerSv2) LoadRatingProfile(ctx *context.Context, attrs *AttrLoadRa if err := apiv2.DataManager.SetLoadIDs(map[string]int64{utils.CacheRatingProfiles: time.Now().UnixNano()}); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache reload + if apiv2.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf("Delaying cache reload for %v", apiv2.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apiv2.Config.GeneralCfg().CachingDelay) if err = dbReader.ReloadCache(config.CgrConfig().GeneralCfg().DefaultCaching, true, make(map[string]any), apiv2.Config.GeneralCfg().DefaultTenant); err != nil { return utils.NewErrServerError(err) } @@ -157,6 +162,11 @@ func (apiv2 *APIerSv2) LoadTariffPlanFromFolder(ctx *context.Context, attrs *uti if attrs.Caching != nil { caching = *attrs.Caching } + // delay if needed before cache reload + if apiv2.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache reload for %v", apiv2.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(apiv2.Config.GeneralCfg().CachingDelay) if err := loader.ReloadCache(caching, true, attrs.APIOpts, apiv2.Config.GeneralCfg().DefaultTimezone); err != nil { return utils.NewErrServerError(err) } diff --git a/apier/v2/attributes.go b/apier/v2/attributes.go index badd917bf..47c2efd3c 100644 --- a/apier/v2/attributes.go +++ b/apier/v2/attributes.go @@ -19,6 +19,7 @@ along with this program. If not, see package v2 import ( + "fmt" "time" "github.com/cgrates/birpc/context" @@ -51,6 +52,11 @@ func (APIerSv2 *APIerSv2) SetAttributeProfile(ctx *context.Context, arg *Attribu map[string]int64{utils.CacheAttributeProfiles: time.Now().UnixNano()}); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if APIerSv2.Config.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", APIerSv2.Config.GeneralCfg().CachingDelay)) + } + time.Sleep(APIerSv2.Config.GeneralCfg().CachingDelay) if err := APIerSv2.APIerSv1.CallCache(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), alsPrf.Tenant, utils.CacheAttributeProfiles, alsPrf.TenantID(), utils.EmptyString, &alsPrf.FilterIDs, alsPrf.Contexts, arg.APIOpts); err != nil { return utils.APIErrorHandler(err) diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index efcd9e9c5..1437eecbb 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -415,12 +415,10 @@ func main() { } // delay if needed before cache reload - if ldrCfg.GeneralCfg().CachingDelay != 0 { - if *verbose { - log.Printf("Delaying %v", ldrCfg.GeneralCfg().CachingDelay) - } - time.Sleep(ldrCfg.GeneralCfg().CachingDelay) + if *verbose && ldrCfg.GeneralCfg().CachingDelay != 0 { + log.Printf("Delaying cache reload for %v", ldrCfg.GeneralCfg().CachingDelay) } + time.Sleep(ldrCfg.GeneralCfg().CachingDelay) // reload cache if err = tpReader.ReloadCache(ldrCfg.GeneralCfg().DefaultCaching, *verbose, map[string]any{ diff --git a/cmd/cgr-loader/cgr-loader_it_test.go b/cmd/cgr-loader/cgr-loader_it_test.go index d1d624d25..848d394ae 100644 --- a/cmd/cgr-loader/cgr-loader_it_test.go +++ b/cmd/cgr-loader/cgr-loader_it_test.go @@ -115,6 +115,7 @@ func TestLoadConfig(t *testing.T) { *schedulerAddress = "" // General *cachingArg = utils.MetaLoad + *cachingDlay = 5 * time.Second *dbDataEncoding = utils.MetaJSON *timezone = utils.Local ldrCfg := loadConfig() @@ -138,6 +139,9 @@ func TestLoadConfig(t *testing.T) { if ldrCfg.GeneralCfg().DefaultCaching != utils.MetaLoad { t.Errorf("Expected %s received %s", utils.MetaLoad, ldrCfg.GeneralCfg().DefaultCaching) } + if ldrCfg.GeneralCfg().CachingDelay != 5*time.Second { + t.Errorf("Expected %s received %s", 5*time.Second, ldrCfg.GeneralCfg().CachingDelay) + } if *importID == utils.EmptyString { t.Errorf("Expected importID to be populated") } @@ -238,6 +242,9 @@ var ( testLoadItCheckAttributes2, testLoadItStartLoaderFromStorDB, testLoadItCheckAttributes, + + testLoadItStartLoaderWithDelayWConf, + testLoadItStartLoaderWithDelayWFlag, } ) @@ -444,3 +451,31 @@ func testLoadItStartLoaderFlushStorDB(t *testing.T) { t.Fatal(err) } } + +func testLoadItStartLoaderWithDelayWConf(t *testing.T) { + cmd := exec.Command("cgr-loader", "-config_path="+path.Join(*dataDir, "conf", "samples", "apier_mysql"), "-path="+path.Join(*dataDir, "tariffplans", "tutorial"), "-caches_address=", "-scheduler_address=", "-to_stordb", "-flush_stordb", "-tpid=TPID") + output := bytes.NewBuffer(nil) + outerr := bytes.NewBuffer(nil) + cmd.Stdout = output + cmd.Stderr = outerr + if err := cmd.Run(); err != nil { + t.Log(cmd.Args) + t.Log(output.String()) + t.Log(outerr.String()) + t.Fatal(err) + } +} + +func testLoadItStartLoaderWithDelayWFlag(t *testing.T) { + cmd := exec.Command("cgr-loader", "-path="+path.Join(*dataDir, "tariffplans", "tutorial"), "-caches_address=", "-scheduler_address=", "-to_stordb", "-flush_stordb", "-tpid=TPID", "-caching_delay=5s") + output := bytes.NewBuffer(nil) + outerr := bytes.NewBuffer(nil) + cmd.Stdout = output + cmd.Stderr = outerr + if err := cmd.Run(); err != nil { + t.Log(cmd.Args) + t.Log(output.String()) + t.Log(outerr.String()) + t.Fatal(err) + } +} diff --git a/data/conf/samples/apier_mongo/apier.json b/data/conf/samples/apier_mongo/apier.json index af060bead..a3e42ba30 100644 --- a/data/conf/samples/apier_mongo/apier.json +++ b/data/conf/samples/apier_mongo/apier.json @@ -7,6 +7,7 @@ "general": { "log_level": 7, "poster_attempts": 1, + "caching_delay": "1s", }, diff --git a/data/conf/samples/apier_mysql/apier.json b/data/conf/samples/apier_mysql/apier.json index abd7d9c0b..0b11da234 100644 --- a/data/conf/samples/apier_mysql/apier.json +++ b/data/conf/samples/apier_mysql/apier.json @@ -7,6 +7,7 @@ "general": { "log_level": 7, "poster_attempts": 1, + "caching_delay": "1s", }, diff --git a/general_tests/set_rmv_prfl_dlay_it_test.go b/general_tests/set_rmv_prfl_dlay_it_test.go new file mode 100644 index 000000000..79c419f69 --- /dev/null +++ b/general_tests/set_rmv_prfl_dlay_it_test.go @@ -0,0 +1,749 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package general_tests + +import ( + "path" + "reflect" + "testing" + "time" + + "github.com/cgrates/birpc/context" + v2 "github.com/cgrates/cgrates/apier/v2" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func TestSetRemoveProfilesWithCachingDelay(t *testing.T) { + var cfgDir string + switch *dbType { + case utils.MetaInternal: + t.SkipNow() + case utils.MetaMySQL: + cfgDir = "apier_mysql" + case utils.MetaMongo: + cfgDir = "apier_mongo" + case utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("Unknown Database type") + } + testEnv := TestEnvironment{ + Name: "TestSetRemoveProfilesWithCachingDelay", + // Encoding: *encoding, + ConfigPath: path.Join(*dataDir, "conf", "samples", cfgDir), + TpPath: path.Join(*dataDir, "tariffplans", "tutorial"), + } + client, _, shutdown, err := testEnv.Setup(t, *waitRater) + if err != nil { + t.Fatal(err) + } + defer shutdown() + + t.Run("RemoveTPFromFolder", func(t *testing.T) { + var reply string + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")} + startTime := time.Now() + if err := client.Call(context.Background(), utils.APIerSv1RemoveTPFromFolder, attrs, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("expected reply , received <%v>", reply) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + time.Sleep(100 * time.Millisecond) + }) + + t.Run("SetAttributeProfile", func(t *testing.T) { + ev := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testAttributeSGetAttributeForEventWihMetaAnyContext", + Event: map[string]any{ + utils.AccountField: "acc", + utils.Destination: "+491511231234", + }, + APIOpts: map[string]any{ + utils.OptsContext: utils.MetaCDRs, + }, + } + eAttrPrf := &engine.AttributeProfileWithAPIOpts{ + AttributeProfile: &engine.AttributeProfile{ + Tenant: ev.Tenant, + ID: "ATTR_1", + FilterIDs: []string{"*string:~*req.Account:acc"}, + Contexts: []string{utils.MetaAny}, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 1, 14, 0, 0, 0, 0, time.UTC)}, + Attributes: []*engine.Attribute{ + { + Path: utils.MetaReq + utils.NestingSep + utils.AccountField, + Value: config.NewRSRParsersMustCompile("1001", utils.InfieldSep), + }, + }, + Weight: 10.0, + }, + } + eAttrPrf.Compile() + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.APIerSv1SetAttributeProfile, eAttrPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("RemoveAttributeProfile", func(t *testing.T) { + + eAttrPrf := &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{ + Tenant: "cgrates.org", + ID: "ATTR_1", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.APIerSv1RemoveAttributeProfile, eAttrPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("SetDispatcherProfile", func(t *testing.T) { + + eDspPrf := &engine.DispatcherProfileWithAPIOpts{ + DispatcherProfile: &engine.DispatcherProfile{ + Tenant: "cgrates.org", + ID: "DSP_1", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.APIerSv1SetDispatcherProfile, eDspPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("RemoveDispatcherProfile", func(t *testing.T) { + + eDspPrf := &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{ + Tenant: "cgrates.org", + ID: "DSP_1", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.APIerSv1RemoveDispatcherProfile, eDspPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("SetResourceProfile", func(t *testing.T) { + + eRscPrf := &engine.ResourceProfileWithAPIOpts{ + ResourceProfile: &engine.ResourceProfile{ + Tenant: "cgrates.org", + ID: "RSC_1", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.APIerSv1SetResourceProfile, eRscPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("RemoveResourceProfile", func(t *testing.T) { + + eRscPrf := &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{ + Tenant: "cgrates.org", + ID: "RSC_1", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.APIerSv1RemoveResourceProfile, eRscPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("SetRouteProfile", func(t *testing.T) { + + eRoutePrf := &engine.RouteProfileWithAPIOpts{ + RouteProfile: &engine.RouteProfile{ + Tenant: "cgrates.org", + ID: "ROUTE_1", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.APIerSv1SetRouteProfile, eRoutePrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("RemoveRouteProfile", func(t *testing.T) { + + eRoutePrf := &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{ + Tenant: "cgrates.org", + ID: "ROUTE_1", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.APIerSv1RemoveRouteProfile, eRoutePrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("SetStatQueueProfile", func(t *testing.T) { + + eSQPrf := &engine.StatQueueProfileWithAPIOpts{ + StatQueueProfile: &engine.StatQueueProfile{ + Tenant: "cgrates.org", + ID: "SQ_1", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.APIerSv1SetStatQueueProfile, eSQPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("RemoveStatQueueProfile", func(t *testing.T) { + + eSQPrf := &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{ + Tenant: "cgrates.org", + ID: "SQ_1", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.APIerSv1RemoveStatQueueProfile, eSQPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("SetThresholdProfile", func(t *testing.T) { + + eTHPrf := &engine.ThresholdProfileWithAPIOpts{ + ThresholdProfile: &engine.ThresholdProfile{ + Tenant: "cgrates.org", + ID: "THRHLD_1", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.APIerSv1SetThresholdProfile, eTHPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("RemoveThresholdProfile", func(t *testing.T) { + + eTHPrf := &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{ + Tenant: "cgrates.org", + ID: "THRHLD_1", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.APIerSv1RemoveThresholdProfile, eTHPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("V2SetAttributeProfile", func(t *testing.T) { + + extAlsPrf := &v2.AttributeWithAPIOpts{ + APIAttributeProfile: &engine.APIAttributeProfile{ + Tenant: "cgrates.org", + ID: "ExternalAttribute", + Contexts: []string{utils.MetaSessionS, utils.MetaCDRs}, + FilterIDs: []string{"*string:~*req.Account:1001"}, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), + ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), + }, + Attributes: []*engine.ExternalAttribute{ + { + Path: utils.MetaReq + utils.NestingSep + "Account", + Value: "1001", + }, + }, + Weight: 20, + }, + } + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.APIerSv2SetAttributeProfile, extAlsPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("V2LoadTariffPlanFromFolder", func(t *testing.T) { + attrs := &utils.AttrLoadTpFromFolder{ + FolderPath: path.Join(*dataDir, "tariffplans", "tutorial"), + } + + exp := utils.LoadInstance{} + + var result utils.LoadInstance + startTime := time.Now() + if err := client.Call(context.Background(), utils.APIerSv2LoadTariffPlanFromFolder, attrs, &result); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(result, exp) { + t.Errorf("Expected <%+v>, \nreceived \n<%+v>", exp, result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("ReplicatorSv1SetThresholdProfile", func(t *testing.T) { + + eTHPrf := &engine.ThresholdProfileWithAPIOpts{ + ThresholdProfile: &engine.ThresholdProfile{ + Tenant: "cgrates.org", + ID: "THRHLD_2", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.ReplicatorSv1SetThresholdProfile, eTHPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("ReplicatorSv1SetStatQueueProfile", func(t *testing.T) { + + eSQPrf := &engine.StatQueueProfileWithAPIOpts{ + StatQueueProfile: &engine.StatQueueProfile{ + Tenant: "cgrates.org", + ID: "SQ_2", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.ReplicatorSv1SetStatQueueProfile, eSQPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("ReplicatorSv1SetResourceProfile", func(t *testing.T) { + + eRscPrf := &engine.ResourceProfileWithAPIOpts{ + ResourceProfile: &engine.ResourceProfile{ + Tenant: "cgrates.org", + ID: "RSC_2", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.ReplicatorSv1SetResourceProfile, eRscPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("ReplicatorSv1SetRatingProfile", func(t *testing.T) { + + eRatingPrf := &engine.RatingProfileWithAPIOpts{ + Tenant: "cgrates.org", + RatingProfile: &engine.RatingProfile{ + Id: "RATE_1", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.ReplicatorSv1SetRatingProfile, eRatingPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("ReplicatorSv1SetRouteProfile", func(t *testing.T) { + + eRoutePrf := &engine.RouteProfileWithAPIOpts{ + RouteProfile: &engine.RouteProfile{ + Tenant: "cgrates.org", + ID: "ROUTE_2", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.ReplicatorSv1SetRouteProfile, eRoutePrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("ReplicatorSv1SetAttributeProfile", func(t *testing.T) { + + ev := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testAttributeSGetAttributeForEventWihMetaAnyContext", + Event: map[string]any{ + utils.AccountField: "acc", + utils.Destination: "+491511231234", + }, + APIOpts: map[string]any{ + utils.OptsContext: utils.MetaCDRs, + }, + } + eAttrPrf := &engine.AttributeProfileWithAPIOpts{ + AttributeProfile: &engine.AttributeProfile{ + Tenant: ev.Tenant, + ID: "ATTR_3", + FilterIDs: []string{"*string:~*req.Account:acc"}, + Contexts: []string{utils.MetaAny}, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 1, 14, 0, 0, 0, 0, time.UTC)}, + Attributes: []*engine.Attribute{ + { + Path: utils.MetaReq + utils.NestingSep + utils.AccountField, + Value: config.NewRSRParsersMustCompile("1001", utils.InfieldSep), + }, + }, + Weight: 10.0, + }, + } + eAttrPrf.Compile() + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.ReplicatorSv1SetAttributeProfile, eAttrPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("ReplicatorSv1SetChargerProfile", func(t *testing.T) { + + eChrgPrf := &engine.ChargerProfileWithAPIOpts{ + ChargerProfile: &engine.ChargerProfile{ + Tenant: "cgrates.org", + ID: "CHRG_1", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.ReplicatorSv1SetChargerProfile, eChrgPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("ReplicatorSv1SetDispatcherProfile", func(t *testing.T) { + + eDspPrf := &engine.DispatcherProfileWithAPIOpts{ + DispatcherProfile: &engine.DispatcherProfile{ + Tenant: "cgrates.org", + ID: "DSP_1", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.ReplicatorSv1SetDispatcherProfile, eDspPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("ReplicatorSv1RemoveThresholdProfile", func(t *testing.T) { + + eTHPrf := &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{ + Tenant: "cgrates.org", + ID: "THRHLD_2", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.ReplicatorSv1RemoveThresholdProfile, eTHPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("ReplicatorSv1RemoveStatQueueProfile", func(t *testing.T) { + + eSQPrf := &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{ + Tenant: "cgrates.org", + ID: "SQ_2", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.ReplicatorSv1RemoveStatQueueProfile, eSQPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("ReplicatorSv1RemoveResourceProfile", func(t *testing.T) { + eRscPrf := &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{ + Tenant: "cgrates.org", + ID: "RSC_2", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.ReplicatorSv1RemoveResourceProfile, eRscPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("ReplicatorSv1RemoveRatingProfile", func(t *testing.T) { + eRtingPrf := &utils.StringWithAPIOpts{ + Tenant: "cgrates.org", + Arg: "RATE_1", + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.ReplicatorSv1RemoveRatingProfile, eRtingPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + + t.Run("ReplicatorSv1RemoveRouteProfile", func(t *testing.T) { + eRoutePrf := &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{ + Tenant: "cgrates.org", + ID: "ROUTE_1", + }, + } + + var result string + startTime := time.Now() + if err := client.Call(context.Background(), utils.ReplicatorSv1RemoveRouteProfile, eRoutePrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } + }) + +} diff --git a/loaders/loader.go b/loaders/loader.go index dda6c1b17..931baf2d4 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -39,7 +39,7 @@ type openedCSVFile struct { } func NewLoader(dm *engine.DataManager, cfg *config.LoaderSCfg, - timezone string, filterS *engine.FilterS, + timezone string, cachingDlay time.Duration, filterS *engine.FilterS, connMgr *engine.ConnManager, cacheConns []string) (ldr *Loader) { ldr = &Loader{ enabled: cfg.Enabled, @@ -51,6 +51,7 @@ func NewLoader(dm *engine.DataManager, cfg *config.LoaderSCfg, lockFilepath: cfg.GetLockFilePath(), fieldSep: cfg.FieldSeparator, runDelay: cfg.RunDelay, + cachingDelay: cachingDlay, dataTpls: make(map[string][]*config.FCTemplate), flagsTpls: make(map[string]utils.FlagsWithParams), rdrs: make(map[string]map[string]*openedCSVFile), @@ -94,6 +95,7 @@ type Loader struct { lockFilepath string fieldSep string runDelay time.Duration + cachingDelay time.Duration dataTpls map[string][]*config.FCTemplate // map[loaderType]*config.FCTemplate flagsTpls map[string]utils.FlagsWithParams //map[loaderType]utils.FlagsWithParams rdrs map[string]map[string]*openedCSVFile // map[loaderType]map[fileName]*openedCSVFile for common incremental read @@ -555,7 +557,11 @@ func (ldr *Loader) storeLoadedData(loaderType string, } } } - + // delay if needed before cache reload + if ldr.cachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf("<%v> Delaying cache reload for %v", utils.LoaderS, ldr.cachingDelay)) + } + time.Sleep(ldr.cachingDelay) if len(ldr.cacheConns) != 0 { return engine.CallCache(ldr.connMgr, ldr.cacheConns, caching, cacheArgs, cacheIDs, nil, false, ldr.tenant) } @@ -802,7 +808,11 @@ func (ldr *Loader) removeLoadedData(loaderType string, lds map[string][]LoaderDa } } } - + // delay if needed before cache reload + if ldr.cachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf("<%v> Delaying cache reload for %v", utils.LoaderS, ldr.cachingDelay)) + } + time.Sleep(ldr.cachingDelay) if len(ldr.cacheConns) != 0 { return engine.CallCache(ldr.connMgr, ldr.cacheConns, caching, cacheArgs, cacheIDs, nil, false, ldr.tenant) } diff --git a/loaders/loader_test.go b/loaders/loader_test.go index 9755aa380..2ca7da92a 100644 --- a/loaders/loader_test.go +++ b/loaders/loader_test.go @@ -1434,7 +1434,7 @@ func TestNewLoaderWithMultiFiles(t *testing.T) { Type: utils.MetaString, Value: config.NewRSRParsersMustCompile("10", utils.InfieldSep)}, } - ldr := NewLoader(engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil), ldrCfg, "", nil, nil, nil) + ldr := NewLoader(engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil), ldrCfg, "", 0, nil, nil, nil) openRdrs := make(utils.StringSet) for _, rdr := range ldr.rdrs { @@ -3086,7 +3086,7 @@ func TestStoreLoadedDataAttributes(t *testing.T) { cacheConns := []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)} loaderCfg := config.CgrConfig().LoaderCfg() fltrS := engine.NewFilterS(cfg, connMgr, dm) - ldr := NewLoader(dm, loaderCfg[0], "", fltrS, connMgr, cacheConns) + ldr := NewLoader(dm, loaderCfg[0], "", 0, fltrS, connMgr, cacheConns) lds := map[string][]LoaderData{ "Attributes": { { @@ -3135,7 +3135,7 @@ func TestStoreLoadedDataResources(t *testing.T) { cacheConns := []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)} loaderCfg := config.CgrConfig().LoaderCfg() fltrS := engine.NewFilterS(cfg, connMgr, dm) - ldr := NewLoader(dm, loaderCfg[0], "", fltrS, connMgr, cacheConns) + ldr := NewLoader(dm, loaderCfg[0], "", 0, fltrS, connMgr, cacheConns) lds := map[string][]LoaderData{ "Resources": { { @@ -3183,7 +3183,7 @@ func TestStoreLoadedDataFilters(t *testing.T) { cacheConns := []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)} loaderCfg := config.CgrConfig().LoaderCfg() fltrS := engine.NewFilterS(cfg, connMgr, dm) - ldr := NewLoader(dm, loaderCfg[0], "", fltrS, connMgr, cacheConns) + ldr := NewLoader(dm, loaderCfg[0], "", 0, fltrS, connMgr, cacheConns) lds := map[string][]LoaderData{ "Filters": { { @@ -3232,7 +3232,7 @@ func TestStoreLoadedDataStats(t *testing.T) { cacheConns := []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)} loaderCfg := config.CgrConfig().LoaderCfg() fltrS := engine.NewFilterS(cfg, connMgr, dm) - ldr := NewLoader(dm, loaderCfg[0], "", fltrS, connMgr, cacheConns) + ldr := NewLoader(dm, loaderCfg[0], "", 0, fltrS, connMgr, cacheConns) lds := map[string][]LoaderData{ "StatsQueue": { { @@ -3281,7 +3281,7 @@ func TestStoreLoadedDataThresholds(t *testing.T) { cacheConns := []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)} loaderCfg := config.CgrConfig().LoaderCfg() fltrS := engine.NewFilterS(cfg, connMgr, dm) - ldr := NewLoader(dm, loaderCfg[0], "", fltrS, connMgr, cacheConns) + ldr := NewLoader(dm, loaderCfg[0], "", 0, fltrS, connMgr, cacheConns) lds := map[string][]LoaderData{ "Thresholds": { { @@ -3329,7 +3329,7 @@ func TestStoreLoadedDataRoutes(t *testing.T) { cacheConns := []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)} loaderCfg := config.CgrConfig().LoaderCfg() fltrS := engine.NewFilterS(cfg, connMgr, dm) - ldr := NewLoader(dm, loaderCfg[0], "", fltrS, connMgr, cacheConns) + ldr := NewLoader(dm, loaderCfg[0], "", 0, fltrS, connMgr, cacheConns) lds := map[string][]LoaderData{ "Routes": { { @@ -3377,7 +3377,7 @@ func TestStoreLoadedDataChargers(t *testing.T) { cacheConns := []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)} loaderCfg := config.CgrConfig().LoaderCfg() fltrS := engine.NewFilterS(cfg, connMgr, dm) - ldr := NewLoader(dm, loaderCfg[0], "", fltrS, connMgr, cacheConns) + ldr := NewLoader(dm, loaderCfg[0], "", 0, fltrS, connMgr, cacheConns) lds := map[string][]LoaderData{ "Chargers": { { @@ -3425,7 +3425,7 @@ func TestStoreLoadedDataDispatchers(t *testing.T) { cacheConns := []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)} loaderCfg := config.CgrConfig().LoaderCfg() fltrS := engine.NewFilterS(cfg, connMgr, dm) - ldr := NewLoader(dm, loaderCfg[0], "", fltrS, connMgr, cacheConns) + ldr := NewLoader(dm, loaderCfg[0], "", 0, fltrS, connMgr, cacheConns) lds := map[string][]LoaderData{ "Dispatchers": { { @@ -3473,7 +3473,7 @@ func TestStoreLoadedDataDispatcherHosts(t *testing.T) { cacheConns := []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)} loaderCfg := config.CgrConfig().LoaderCfg() fltrS := engine.NewFilterS(cfg, connMgr, dm) - ldr := NewLoader(dm, loaderCfg[0], "", fltrS, connMgr, cacheConns) + ldr := NewLoader(dm, loaderCfg[0], "", 0, fltrS, connMgr, cacheConns) lds := map[string][]LoaderData{ "DispatcherHosts": { { @@ -3487,3 +3487,56 @@ func TestStoreLoadedDataDispatcherHosts(t *testing.T) { t.Error(err) } } + +func TestStoreLoadedDataWithDelay(t *testing.T) { + engine.Cache.Clear(nil) + cfg := config.NewDefaultCGRConfig() + cfg.GeneralCfg().CachingDelay = 1 * time.Second + argExpect := &utils.AttrReloadCacheWithAPIOpts{ + APIOpts: nil, + Tenant: "", + DispatcherHostIDs: []string{"cgrates.org:dispatcherHostsID"}, + } + cM := &ccMock{ + calls: map[string]func(args any, reply any) error{ + utils.CacheSv1ReloadCache: func(args any, reply any) error { + if !reflect.DeepEqual(args, argExpect) { + t.Errorf("Expected %v \nbut received %v", utils.ToJSON(argExpect), utils.ToJSON(args)) + } + return nil + }, + utils.CacheSv1Clear: func(args any, reply any) error { + return nil + }, + }, + } + + rpcInternal := make(chan birpc.ClientConnector, 1) + rpcInternal <- cM + connMgr := engine.NewConnManager(cfg, map[string]chan birpc.ClientConnector{ + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): rpcInternal, + }) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), connMgr) + cacheConns := []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)} + loaderCfg := config.CgrConfig().LoaderCfg() + fltrS := engine.NewFilterS(cfg, connMgr, dm) + ldr := NewLoader(dm, loaderCfg[0], "", cfg.GeneralCfg().CachingDelay, fltrS, connMgr, cacheConns) + lds := map[string][]LoaderData{ + "DispatcherHosts": { + { + "Tenant": "cgrates.org", + "ID": "dispatcherHostsID", + "Address": "192.168.100.1", + }, + }, + } + startTime := time.Now() + if err := ldr.storeLoadedData(utils.MetaDispatcherHosts, lds, utils.MetaReload); err != nil { + t.Error(err) + } + elapsedTime := time.Since(startTime) + expectedDuration := 1 * time.Second + if elapsedTime < expectedDuration || elapsedTime >= 2*time.Second { + t.Errorf("Expected elapsed time of at least %v, but got %v", expectedDuration, elapsedTime) + } +} diff --git a/loaders/loaders.go b/loaders/loaders.go index c1c8b0654..3dc33c1ff 100644 --- a/loaders/loaders.go +++ b/loaders/loaders.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "sync" + "time" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" @@ -30,12 +31,12 @@ import ( ) func NewLoaderService(dm *engine.DataManager, ldrsCfg []*config.LoaderSCfg, - timezone string, filterS *engine.FilterS, + timezone string, cachingDlay time.Duration, filterS *engine.FilterS, connMgr *engine.ConnManager) (ldrS *LoaderService) { ldrS = &LoaderService{ldrs: make(map[string]*Loader)} for _, ldrCfg := range ldrsCfg { if ldrCfg.Enabled { - ldrS.ldrs[ldrCfg.ID] = NewLoader(dm, ldrCfg, timezone, filterS, connMgr, ldrCfg.CacheSConns) + ldrS.ldrs[ldrCfg.ID] = NewLoader(dm, ldrCfg, timezone, cachingDlay, filterS, connMgr, ldrCfg.CacheSConns) } } return @@ -137,12 +138,12 @@ func (ldrS *LoaderService) V1Remove(ctx *context.Context, args *ArgsProcessFolde // Reload recreates the loaders map thread safe func (ldrS *LoaderService) Reload(dm *engine.DataManager, ldrsCfg []*config.LoaderSCfg, - timezone string, filterS *engine.FilterS, connMgr *engine.ConnManager) { + timezone string, cachingDlay time.Duration, filterS *engine.FilterS, connMgr *engine.ConnManager) { ldrS.Lock() ldrS.ldrs = make(map[string]*Loader) for _, ldrCfg := range ldrsCfg { if ldrCfg.Enabled { - ldrS.ldrs[ldrCfg.ID] = NewLoader(dm, ldrCfg, timezone, filterS, connMgr, ldrCfg.CacheSConns) + ldrS.ldrs[ldrCfg.ID] = NewLoader(dm, ldrCfg, timezone, cachingDlay, filterS, connMgr, ldrCfg.CacheSConns) } } ldrS.Unlock() diff --git a/loaders/loaders_it_test.go b/loaders/loaders_it_test.go index 6503e445d..ff42a30d3 100644 --- a/loaders/loaders_it_test.go +++ b/loaders/loaders_it_test.go @@ -120,7 +120,7 @@ cgrates.org,NewRes1`)); err != nil { for _, tmp := range cfg[0].Data[0].Fields { tmp.ComputePath() } - ldrs := NewLoaderService(dm, cfg, "UTC", nil, nil) + ldrs := NewLoaderService(dm, cfg, "UTC", 0, nil, nil) var reply string expected := "ANOTHER_LOADER_RUNNING" @@ -183,7 +183,7 @@ cgrates.org,NewRes1 } var reply string - ldrs := NewLoaderService(dm, cfgLdr, "UTC", nil, nil) + ldrs := NewLoaderService(dm, cfgLdr, "UTC", 0, nil, nil) if err := ldrs.V1Load(context.Background(), &ArgsProcessFolder{ LoaderID: utils.EmptyString}, &reply); err == nil && reply != utils.EmptyString && err.Error() != utils.EmptyString { t.Errorf("Expected %+v and %+v \n, received %+v and %+v", utils.EmptyString, utils.EmptyString, err, reply) @@ -219,7 +219,7 @@ func testV1LoadUnableToDeleteFile(t *testing.T) { Data: nil, } var reply string - ldrs := NewLoaderService(dm, cfgLdr, "UTC", nil, nil) + ldrs := NewLoaderService(dm, cfgLdr, "UTC", 0, nil, nil) expected := "SERVER_ERROR: stat /\x00/Resources.csv: invalid argument" if err := ldrs.V1Load(context.Background(), &ArgsProcessFolder{ @@ -262,7 +262,7 @@ NOT_UINT LockFilePath: utils.ResourcesCsv, Data: nil, } - ldrs := NewLoaderService(dm, cfgLdr, "UTC", nil, nil) + ldrs := NewLoaderService(dm, cfgLdr, "UTC", 0, nil, nil) ldrs.ldrs["testV1LoadResource"].dataTpls = map[string][]*config.FCTemplate{ utils.MetaFilters: { {Tag: "PK", @@ -359,7 +359,7 @@ cgrates.org,NewRes1`)) for _, tmp := range cfg[0].Data[0].Fields { tmp.ComputePath() } - ldrs := NewLoaderService(dm, cfg, time.UTC.String(), nil, nil) + ldrs := NewLoaderService(dm, cfg, time.UTC.String(), 0, nil, nil) //To remove a resource, we need to set it first if err := dm.SetResourceProfile(&engine.ResourceProfile{ Tenant: "cgrates.org", @@ -423,7 +423,7 @@ cgrates.org,NewRes1 } var reply string - ldrs := NewLoaderService(dm, cfgLdr, "UTC", nil, nil) + ldrs := NewLoaderService(dm, cfgLdr, "UTC", 0, nil, nil) expected := "UNKNOWN_LOADER: *default" if err := ldrs.V1Remove(context.Background(), &ArgsProcessFolder{ LoaderID: utils.EmptyString}, &reply); err == nil || reply != utils.EmptyString || err.Error() != expected { @@ -460,7 +460,7 @@ func testV1RemoveUnableToDeleteFile(t *testing.T) { Data: nil, } var reply string - ldrs := NewLoaderService(dm, cfgLdr, "UTC", nil, nil) + ldrs := NewLoaderService(dm, cfgLdr, "UTC", 0, nil, nil) expected := "SERVER_ERROR: stat /\x00/Resources.csv: invalid argument" if err := ldrs.V1Remove(context.Background(), &ArgsProcessFolder{ @@ -497,7 +497,7 @@ func testV1LoadAndRemoveProcessRemoveFolderError(t *testing.T) { TpOutDir: "/tmp", Data: nil, } - ldrs := NewLoaderService(dm, cfgLdr, "UTC", nil, nil) + ldrs := NewLoaderService(dm, cfgLdr, "UTC", 0, nil, nil) ldrs.ldrs["testV1RemoveProcessFolderError"].lockFilepath = flPath @@ -558,7 +558,7 @@ func testV1RemoveProcessFolderError(t *testing.T) { LockFilePath: "notResource.csv", Data: nil, } - ldrs := NewLoaderService(dm, cfgLdr, "UTC", nil, nil) + ldrs := NewLoaderService(dm, cfgLdr, "UTC", 0, nil, nil) ldrs.ldrs["testV1RemoveProcessFolderError"].rdrs = map[string]map[string]*openedCSVFile{ utils.MetaResources: { "not_a_file2": &openedCSVFile{ @@ -624,7 +624,7 @@ func testLoaderServiceReload(t *testing.T) { Data: nil, } ldrs := &LoaderService{} - ldrs.Reload(dm, cfgLdr, "UTC", nil, nil) + ldrs.Reload(dm, cfgLdr, "UTC", 0, nil, nil) if ldrs.ldrs == nil { t.Error("Expected to be populated") } diff --git a/services/loaders.go b/services/loaders.go index fea2e845e..231bda277 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -80,7 +80,7 @@ func (ldrs *LoaderService) Start() error { defer ldrs.Unlock() ldrs.ldrs = loaders.NewLoaderService(datadb, ldrs.cfg.LoaderCfg(), - ldrs.cfg.GeneralCfg().DefaultTimezone, filterS, ldrs.connMgr) + ldrs.cfg.GeneralCfg().DefaultTimezone, ldrs.cfg.GeneralCfg().CachingDelay, filterS, ldrs.connMgr) if !ldrs.ldrs.Enabled() { return nil @@ -113,7 +113,7 @@ func (ldrs *LoaderService) Reload() (err error) { ldrs.RLock() - ldrs.ldrs.Reload(datadb, ldrs.cfg.LoaderCfg(), ldrs.cfg.GeneralCfg().DefaultTimezone, + ldrs.ldrs.Reload(datadb, ldrs.cfg.LoaderCfg(), ldrs.cfg.GeneralCfg().DefaultTimezone, ldrs.cfg.GeneralCfg().CachingDelay, filterS, ldrs.connMgr) if err = ldrs.ldrs.ListenAndServe(ldrs.stopChan); err != nil { return diff --git a/services/loaders_test.go b/services/loaders_test.go index f93563c0a..d261dd1d0 100644 --- a/services/loaders_test.go +++ b/services/loaders_test.go @@ -66,7 +66,7 @@ func TestLoaderSCoverage(t *testing.T) { TpInDir: "", TpOutDir: "", Data: nil, - }}, "", + }}, "", 0, &engine.FilterS{}, nil) if !srv.IsRunning() { t.Errorf("Expected service to be running")