mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-16 05:39:54 +05:00
Started to remove scheduler
This commit is contained in:
committed by
Dan Christian Bogos
parent
ed6a0053aa
commit
58ea63eac3
@@ -31,24 +31,17 @@ import (
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// SchedulerGeter used to avoid ciclic dependency
|
||||
type SchedulerGeter interface {
|
||||
GetScheduler() *scheduler.Scheduler
|
||||
}
|
||||
|
||||
type APIerSv1 struct {
|
||||
StorDb engine.LoadStorage // we should consider keeping only one of StorDB type
|
||||
CdrDb engine.CdrStorage
|
||||
DataManager *engine.DataManager
|
||||
Config *config.CGRConfig
|
||||
Responder *engine.Responder
|
||||
SchedulerService SchedulerGeter // Need to have them capitalize so we can export in V2
|
||||
FilterS *engine.FilterS //Used for CDR Exporter
|
||||
ConnMgr *engine.ConnManager
|
||||
StorDb engine.LoadStorage // we should consider keeping only one of StorDB type
|
||||
CdrDb engine.CdrStorage
|
||||
DataManager *engine.DataManager
|
||||
Config *config.CGRConfig
|
||||
Responder *engine.Responder
|
||||
FilterS *engine.FilterS //Used for CDR Exporter
|
||||
ConnMgr *engine.ConnManager
|
||||
|
||||
StorDBChan chan engine.StorDB
|
||||
ResponderChan chan *engine.Responder
|
||||
@@ -261,7 +254,7 @@ func (apierSv1 *APIerSv1) LoadDestination(attrs *AttrLoadDestination, reply *str
|
||||
}
|
||||
dbReader, err := engine.NewTpReader(apierSv1.DataManager.DataDB(), apierSv1.StorDb,
|
||||
attrs.TPid, apierSv1.Config.GeneralCfg().DefaultTimezone, apierSv1.Config.ApierCfg().CachesConns,
|
||||
apierSv1.Config.ApierCfg().SchedulerConns,
|
||||
apierSv1.Config.ApierCfg().ActionConns,
|
||||
apierSv1.Config.DataDbCfg().Type == utils.INTERNAL)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -293,7 +286,7 @@ func (apierSv1 *APIerSv1) LoadRatingPlan(attrs *AttrLoadRatingPlan, reply *strin
|
||||
}
|
||||
dbReader, err := engine.NewTpReader(apierSv1.DataManager.DataDB(), apierSv1.StorDb,
|
||||
attrs.TPid, apierSv1.Config.GeneralCfg().DefaultTimezone,
|
||||
apierSv1.Config.ApierCfg().CachesConns, apierSv1.Config.ApierCfg().SchedulerConns,
|
||||
apierSv1.Config.ApierCfg().CachesConns, apierSv1.Config.ApierCfg().ActionConns,
|
||||
apierSv1.Config.DataDbCfg().Type == utils.INTERNAL)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -317,7 +310,7 @@ func (apierSv1 *APIerSv1) LoadRatingProfile(attrs *utils.TPRatingProfile, reply
|
||||
}
|
||||
dbReader, err := engine.NewTpReader(apierSv1.DataManager.DataDB(), apierSv1.StorDb,
|
||||
attrs.TPid, apierSv1.Config.GeneralCfg().DefaultTimezone,
|
||||
apierSv1.Config.ApierCfg().CachesConns, apierSv1.Config.ApierCfg().SchedulerConns,
|
||||
apierSv1.Config.ApierCfg().CachesConns, apierSv1.Config.ApierCfg().ActionConns,
|
||||
apierSv1.Config.DataDbCfg().Type == utils.INTERNAL)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -347,7 +340,7 @@ func (apierSv1 *APIerSv1) LoadSharedGroup(attrs *AttrLoadSharedGroup, reply *str
|
||||
}
|
||||
dbReader, err := engine.NewTpReader(apierSv1.DataManager.DataDB(), apierSv1.StorDb,
|
||||
attrs.TPid, apierSv1.Config.GeneralCfg().DefaultTimezone,
|
||||
apierSv1.Config.ApierCfg().CachesConns, apierSv1.Config.ApierCfg().SchedulerConns,
|
||||
apierSv1.Config.ApierCfg().CachesConns, apierSv1.Config.ApierCfg().ActionConns,
|
||||
apierSv1.Config.DataDbCfg().Type == utils.INTERNAL)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -374,7 +367,7 @@ func (apierSv1 *APIerSv1) LoadTariffPlanFromStorDb(attrs *AttrLoadTpFromStorDb,
|
||||
}
|
||||
dbReader, err := engine.NewTpReader(apierSv1.DataManager.DataDB(), apierSv1.StorDb,
|
||||
attrs.TPid, apierSv1.Config.GeneralCfg().DefaultTimezone,
|
||||
apierSv1.Config.ApierCfg().CachesConns, apierSv1.Config.ApierCfg().SchedulerConns,
|
||||
apierSv1.Config.ApierCfg().CachesConns, apierSv1.Config.ApierCfg().ActionConns,
|
||||
apierSv1.Config.DataDbCfg().Type == utils.INTERNAL)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -405,7 +398,7 @@ func (apierSv1 *APIerSv1) LoadTariffPlanFromStorDb(attrs *AttrLoadTpFromStorDb,
|
||||
if err := dbReader.ReloadCache(caching, true, attrs.APIOpts); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if len(apierSv1.Config.ApierCfg().SchedulerConns) != 0 {
|
||||
if len(apierSv1.Config.ApierCfg().ActionConns) != 0 {
|
||||
utils.Logger.Info("APIerSv1.LoadTariffPlanFromStorDb, reloading scheduler.")
|
||||
if err := dbReader.ReloadScheduler(true); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -579,130 +572,10 @@ type V1TPAction struct {
|
||||
Weight float64 // Action's weight
|
||||
}
|
||||
|
||||
func (apierSv1 *APIerSv1) SetActions(attrs *V1AttrSetActions, reply *string) (err error) {
|
||||
if missing := utils.MissingStructFields(attrs, []string{"ActionsId", "Actions"}); len(missing) != 0 {
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
for _, action := range attrs.Actions {
|
||||
requiredFields := []string{"Identifier", "Weight"}
|
||||
if action.BalanceType != utils.EmptyString { // Add some inter-dependent parameters - if balanceType then we are not talking about simply calling actions
|
||||
requiredFields = append(requiredFields, "Units")
|
||||
}
|
||||
if missing := utils.MissingStructFields(action, requiredFields); len(missing) != 0 {
|
||||
return fmt.Errorf("%s:Action:%s:%v", utils.ErrMandatoryIeMissing.Error(), action.Identifier, missing)
|
||||
}
|
||||
}
|
||||
if !attrs.Overwrite {
|
||||
if exists, err := apierSv1.DataManager.HasData(utils.ActionPrefix, attrs.ActionsId, ""); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
} else if exists {
|
||||
return utils.ErrExists
|
||||
}
|
||||
}
|
||||
storeActions := make(engine.Actions, len(attrs.Actions))
|
||||
for idx, apiAct := range attrs.Actions {
|
||||
var blocker *bool
|
||||
if apiAct.BalanceBlocker != utils.EmptyString {
|
||||
if x, err := strconv.ParseBool(apiAct.BalanceBlocker); err == nil {
|
||||
blocker = &x
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
var disabled *bool
|
||||
if apiAct.BalanceDisabled != utils.EmptyString {
|
||||
if x, err := strconv.ParseBool(apiAct.BalanceDisabled); err == nil {
|
||||
disabled = &x
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
a := &engine.Action{
|
||||
Id: attrs.ActionsId,
|
||||
ActionType: apiAct.Identifier,
|
||||
Weight: apiAct.Weight,
|
||||
ExpirationString: apiAct.ExpiryTime,
|
||||
ExtraParameters: apiAct.ExtraParameters,
|
||||
Filter: apiAct.Filter,
|
||||
Balance: &engine.BalanceFilter{ // TODO: update this part
|
||||
Uuid: utils.StringPointer(apiAct.BalanceUuid),
|
||||
ID: utils.StringPointer(apiAct.BalanceId),
|
||||
Type: utils.StringPointer(apiAct.BalanceType),
|
||||
Value: &utils.ValueFormula{Static: apiAct.Units},
|
||||
Weight: apiAct.BalanceWeight,
|
||||
DestinationIDs: utils.StringMapPointer(utils.ParseStringMap(apiAct.DestinationIds)),
|
||||
RatingSubject: utils.StringPointer(apiAct.RatingSubject),
|
||||
SharedGroups: utils.StringMapPointer(utils.ParseStringMap(apiAct.SharedGroups)),
|
||||
Categories: utils.StringMapPointer(utils.ParseStringMap(apiAct.Categories)),
|
||||
TimingIDs: utils.StringMapPointer(utils.ParseStringMap(apiAct.TimingTags)),
|
||||
Blocker: blocker,
|
||||
Disabled: disabled,
|
||||
},
|
||||
}
|
||||
storeActions[idx] = a
|
||||
}
|
||||
if err := apierSv1.DataManager.SetActions(attrs.ActionsId, storeActions, utils.NonTransactional); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
//CacheReload
|
||||
if err := apierSv1.ConnMgr.Call(apierSv1.Config.ApierCfg().CachesConns, nil,
|
||||
utils.CacheSv1ReloadCache, utils.AttrReloadCacheWithAPIOpts{
|
||||
ArgsCache: map[string][]string{utils.ActionIDs: {attrs.ActionsId}},
|
||||
}, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
//generate a loadID for CacheActions and store it in database
|
||||
if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheActions: time.Now().UnixNano()}); err != nil {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
// Retrieves actions attached to specific ActionsId within cache
|
||||
func (apierSv1 *APIerSv1) GetActions(actsId *string, reply *[]*utils.TPAction) error {
|
||||
if len(*actsId) == 0 {
|
||||
return fmt.Errorf("%s ActionsId: %s", utils.ErrMandatoryIeMissing.Error(), *actsId)
|
||||
}
|
||||
acts := make([]*utils.TPAction, 0)
|
||||
engActs, err := apierSv1.DataManager.GetActions(*actsId, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
for _, engAct := range engActs {
|
||||
act := &utils.TPAction{
|
||||
Identifier: engAct.ActionType,
|
||||
ExpiryTime: engAct.ExpirationString,
|
||||
ExtraParameters: engAct.ExtraParameters,
|
||||
Filter: engAct.Filter,
|
||||
Weight: engAct.Weight,
|
||||
}
|
||||
bf := engAct.Balance
|
||||
if bf != nil {
|
||||
act.BalanceType = bf.GetType()
|
||||
act.Units = strconv.FormatFloat(bf.GetValue(), 'f', -1, 64)
|
||||
act.DestinationIds = bf.GetDestinationIDs().String()
|
||||
act.RatingSubject = bf.GetRatingSubject()
|
||||
act.SharedGroups = bf.GetSharedGroups().String()
|
||||
act.BalanceWeight = strconv.FormatFloat(bf.GetWeight(), 'f', -1, 64)
|
||||
act.TimingTags = bf.GetTimingIDs().String()
|
||||
act.BalanceId = bf.GetID()
|
||||
act.Categories = bf.GetCategories().String()
|
||||
act.BalanceBlocker = strconv.FormatBool(bf.GetBlocker())
|
||||
act.BalanceDisabled = strconv.FormatBool(bf.GetDisabled())
|
||||
}
|
||||
acts = append(acts, act)
|
||||
}
|
||||
*reply = acts
|
||||
return nil
|
||||
}
|
||||
|
||||
type AttrSetActionPlan struct {
|
||||
Id string // Profile id
|
||||
ActionPlan []*AttrActionPlan // Set of actions this Actions profile will perform
|
||||
Overwrite bool // If previously defined, will be overwritten
|
||||
ReloadScheduler bool // Enables automatic reload of the scheduler (eg: useful when adding a single action timing)
|
||||
Id string // Profile id
|
||||
ActionPlan []*AttrActionPlan // Set of actions this Actions profile will perform
|
||||
Overwrite bool // If previously defined, will be overwritten
|
||||
}
|
||||
|
||||
type AttrActionPlan struct {
|
||||
@@ -818,13 +691,6 @@ func (apierSv1 *APIerSv1) SetActionPlan(attrs *AttrSetActionPlan, reply *string)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if attrs.ReloadScheduler {
|
||||
sched := apierSv1.SchedulerService.GetScheduler()
|
||||
if sched == nil {
|
||||
return errors.New(utils.SchedulerNotRunningCaps)
|
||||
}
|
||||
sched.Reload()
|
||||
}
|
||||
//generate a loadID for CacheActionPlans and store it in database
|
||||
if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheActionPlans: time.Now().UnixNano()}); err != nil {
|
||||
return utils.APIErrorHandler(err)
|
||||
@@ -1014,7 +880,7 @@ func (apierSv1 *APIerSv1) LoadAccountActions(attrs *utils.TPAccountActions, repl
|
||||
}
|
||||
dbReader, err := engine.NewTpReader(apierSv1.DataManager.DataDB(), apierSv1.StorDb,
|
||||
attrs.TPid, apierSv1.Config.GeneralCfg().DefaultTimezone,
|
||||
apierSv1.Config.ApierCfg().CachesConns, apierSv1.Config.ApierCfg().SchedulerConns,
|
||||
apierSv1.Config.ApierCfg().CachesConns, apierSv1.Config.ApierCfg().ActionConns,
|
||||
apierSv1.Config.DataDbCfg().Type == utils.INTERNAL)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -1026,10 +892,6 @@ func (apierSv1 *APIerSv1) LoadAccountActions(attrs *utils.TPAccountActions, repl
|
||||
}
|
||||
// ToDo: Get the action keys loaded by dbReader so we reload only these in cache
|
||||
// Need to do it before scheduler otherwise actions to run will be unknown
|
||||
sched := apierSv1.SchedulerService.GetScheduler()
|
||||
if sched != nil {
|
||||
sched.Reload()
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
@@ -1053,7 +915,7 @@ func (apierSv1 *APIerSv1) LoadTariffPlanFromFolder(attrs *utils.AttrLoadTpFromFo
|
||||
loader, err := engine.NewTpReader(apierSv1.DataManager.DataDB(),
|
||||
engine.NewFileCSVStorage(utils.CSVSep, attrs.FolderPath),
|
||||
"", apierSv1.Config.GeneralCfg().DefaultTimezone,
|
||||
apierSv1.Config.ApierCfg().CachesConns, apierSv1.Config.ApierCfg().SchedulerConns,
|
||||
apierSv1.Config.ApierCfg().CachesConns, apierSv1.Config.ApierCfg().ActionConns,
|
||||
apierSv1.Config.DataDbCfg().Type == utils.INTERNAL)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -1087,7 +949,7 @@ func (apierSv1 *APIerSv1) LoadTariffPlanFromFolder(attrs *utils.AttrLoadTpFromFo
|
||||
if err := loader.ReloadCache(caching, true, attrs.APIOpts); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if len(apierSv1.Config.ApierCfg().SchedulerConns) != 0 {
|
||||
if len(apierSv1.Config.ApierCfg().ActionConns) != 0 {
|
||||
utils.Logger.Info("APIerSv1.LoadTariffPlanFromFolder, reloading scheduler.")
|
||||
if err := loader.ReloadScheduler(true); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -1119,7 +981,7 @@ func (apierSv1 *APIerSv1) RemoveTPFromFolder(attrs *utils.AttrLoadTpFromFolder,
|
||||
// create the TpReader
|
||||
loader, err := engine.NewTpReader(apierSv1.DataManager.DataDB(),
|
||||
engine.NewFileCSVStorage(utils.CSVSep, attrs.FolderPath), "", apierSv1.Config.GeneralCfg().DefaultTimezone,
|
||||
apierSv1.Config.ApierCfg().CachesConns, apierSv1.Config.ApierCfg().SchedulerConns,
|
||||
apierSv1.Config.ApierCfg().CachesConns, apierSv1.Config.ApierCfg().ActionConns,
|
||||
apierSv1.Config.DataDbCfg().Type == utils.INTERNAL)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -1153,7 +1015,7 @@ func (apierSv1 *APIerSv1) RemoveTPFromFolder(attrs *utils.AttrLoadTpFromFolder,
|
||||
if err := loader.ReloadCache(caching, true, attrs.APIOpts); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if len(apierSv1.Config.ApierCfg().SchedulerConns) != 0 {
|
||||
if len(apierSv1.Config.ApierCfg().ActionConns) != 0 {
|
||||
utils.Logger.Info("APIerSv1.RemoveTPFromFolder, reloading scheduler.")
|
||||
if err := loader.ReloadScheduler(true); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -1173,7 +1035,7 @@ func (apierSv1 *APIerSv1) RemoveTPFromStorDB(attrs *AttrLoadTpFromStorDb, reply
|
||||
}
|
||||
dbReader, err := engine.NewTpReader(apierSv1.DataManager.DataDB(), apierSv1.StorDb,
|
||||
attrs.TPid, apierSv1.Config.GeneralCfg().DefaultTimezone,
|
||||
apierSv1.Config.ApierCfg().CachesConns, apierSv1.Config.ApierCfg().SchedulerConns,
|
||||
apierSv1.Config.ApierCfg().CachesConns, apierSv1.Config.ApierCfg().ActionConns,
|
||||
apierSv1.Config.DataDbCfg().Type == utils.INTERNAL)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -1205,7 +1067,7 @@ func (apierSv1 *APIerSv1) RemoveTPFromStorDB(attrs *AttrLoadTpFromStorDb, reply
|
||||
if err := dbReader.ReloadCache(caching, true, attrs.APIOpts); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if len(apierSv1.Config.ApierCfg().SchedulerConns) != 0 {
|
||||
if len(apierSv1.Config.ApierCfg().ActionConns) != 0 {
|
||||
utils.Logger.Info("APIerSv1.RemoveTPFromStorDB, reloading scheduler.")
|
||||
if err := dbReader.ReloadScheduler(true); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
|
||||
@@ -28,8 +28,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/dispatchers"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -330,7 +328,6 @@ func testAPIerSetActionPlanDfltTime(t *testing.T) {
|
||||
Weight: 20.0,
|
||||
},
|
||||
},
|
||||
ReloadScheduler: true,
|
||||
}
|
||||
if err := apierRPC.Call(utils.APIerSv1SetActionPlan, &hourlyAP, &reply1); err != nil {
|
||||
t.Error("Got error on APIerSv1.SetActionPlan: ", err.Error())
|
||||
@@ -346,7 +343,6 @@ func testAPIerSetActionPlanDfltTime(t *testing.T) {
|
||||
Weight: 20.0,
|
||||
},
|
||||
},
|
||||
ReloadScheduler: true,
|
||||
}
|
||||
if err := apierRPC.Call(utils.APIerSv1SetActionPlan, &dailyAP, &reply1); err != nil {
|
||||
t.Error("Got error on APIerSv1.SetActionPlan: ", err.Error())
|
||||
@@ -362,7 +358,6 @@ func testAPIerSetActionPlanDfltTime(t *testing.T) {
|
||||
Weight: 20.0,
|
||||
},
|
||||
},
|
||||
ReloadScheduler: true,
|
||||
}
|
||||
if err := apierRPC.Call(utils.APIerSv1SetActionPlan, &weeklyAP, &reply1); err != nil {
|
||||
t.Error("Got error on APIerSv1.SetActionPlan: ", err.Error())
|
||||
@@ -378,50 +373,12 @@ func testAPIerSetActionPlanDfltTime(t *testing.T) {
|
||||
Weight: 20.0,
|
||||
},
|
||||
},
|
||||
ReloadScheduler: true,
|
||||
}
|
||||
if err := apierRPC.Call(utils.APIerSv1SetActionPlan, &monthlyAP, &reply1); err != nil {
|
||||
t.Error("Got error on APIerSv1.SetActionPlan: ", err.Error())
|
||||
} else if reply1 != utils.OK {
|
||||
t.Errorf("Calling APIerSv1.SetActionPlan received: %s", reply1)
|
||||
}
|
||||
var rply []*scheduler.ScheduledAction
|
||||
if err := apierRPC.Call(utils.APIerSv1GetScheduledActions,
|
||||
scheduler.ArgsGetScheduledActions{}, &rply); err != nil {
|
||||
t.Error("Unexpected error: ", err)
|
||||
} else {
|
||||
for _, schedAct := range rply {
|
||||
switch schedAct.ActionPlanID {
|
||||
case "AP_WEEKLY":
|
||||
t1 := time.Now().AddDate(0, 0, 7)
|
||||
if schedAct.NextRunTime.Before(t1.Add(-2*time.Second)) ||
|
||||
schedAct.NextRunTime.After(t1.Add(time.Second)) {
|
||||
t.Errorf("Expected the nextRuntime to be after 1 week,but received: <%+v>", utils.ToJSON(schedAct))
|
||||
}
|
||||
case "AP_DAILY":
|
||||
t1 := time.Now().AddDate(0, 0, 1)
|
||||
if schedAct.NextRunTime.Before(t1.Add(-2*time.Second)) ||
|
||||
schedAct.NextRunTime.After(t1.Add(time.Second)) {
|
||||
t.Errorf("Expected the nextRuntime to be after 1 day,but received: <%+v>", utils.ToJSON(schedAct))
|
||||
}
|
||||
case "AP_HOURLY":
|
||||
if schedAct.NextRunTime.Before(time.Now().Add(59*time.Minute+58*time.Second)) ||
|
||||
schedAct.NextRunTime.After(time.Now().Add(time.Hour+time.Second)) {
|
||||
t.Errorf("Expected the nextRuntime to be after 1 hour,but received: <%+v>", utils.ToJSON(schedAct))
|
||||
}
|
||||
case "AP_MONTHLY":
|
||||
// *monthly needs to mach exactly the day
|
||||
tnow := time.Now()
|
||||
expected := tnow.AddDate(0, 1, 0)
|
||||
expected = time.Date(expected.Year(), expected.Month(), tnow.Day(), tnow.Hour(),
|
||||
tnow.Minute(), tnow.Second(), 0, schedAct.NextRunTime.Location())
|
||||
if schedAct.NextRunTime.Before(expected.Add(-time.Second)) ||
|
||||
schedAct.NextRunTime.After(expected.Add(time.Second)) {
|
||||
t.Errorf("Expected the nextRuntime to be after 1 month,but received: <%+v>", utils.ToJSON(schedAct))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testAPIerLoadRatingPlan(t *testing.T) {
|
||||
|
||||
@@ -1,110 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package v1
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
/*
|
||||
[
|
||||
{
|
||||
u'ActionsId': u'BONUS_1',
|
||||
u'Uuid': u'5b5ba53b40b1d44380cce52379ec5c0d',
|
||||
u'Weight': 10,
|
||||
u'Timing': {
|
||||
u'Timing': {
|
||||
u'MonthDays': [
|
||||
|
||||
],
|
||||
u'Months': [
|
||||
|
||||
],
|
||||
u'WeekDays': [
|
||||
|
||||
],
|
||||
u'Years': [
|
||||
2013
|
||||
],
|
||||
u'StartTime': u'11: 00: 00',
|
||||
u'EndTime': u''
|
||||
},
|
||||
u'Rating': None,
|
||||
u'Weight': 0
|
||||
},
|
||||
u'AccountIds': [
|
||||
u'*out: cgrates.org: 1001',
|
||||
u'*out: cgrates.org: 1002',
|
||||
u'*out: cgrates.org: 1003',
|
||||
u'*out: cgrates.org: 1004',
|
||||
u'*out: cgrates.org: 1005'
|
||||
],
|
||||
u'Id': u'PREPAID_10'
|
||||
},
|
||||
{
|
||||
u'ActionsId': u'PREPAID_10',
|
||||
u'Uuid': u'b16ab12740e2e6c380ff7660e8b55528',
|
||||
u'Weight': 10,
|
||||
u'Timing': {
|
||||
u'Timing': {
|
||||
u'MonthDays': [
|
||||
|
||||
],
|
||||
u'Months': [
|
||||
|
||||
],
|
||||
u'WeekDays': [
|
||||
|
||||
],
|
||||
u'Years': [
|
||||
2013
|
||||
],
|
||||
u'StartTime': u'11: 00: 00',
|
||||
u'EndTime': u''
|
||||
},
|
||||
u'Rating': None,
|
||||
u'Weight': 0
|
||||
},
|
||||
u'AccountIds': [
|
||||
u'*out: cgrates.org: 1001',
|
||||
u'*out: cgrates.org: 1002',
|
||||
u'*out: cgrates.org: 1003',
|
||||
u'*out: cgrates.org: 1004',
|
||||
u'*out: cgrates.org: 1005'
|
||||
],
|
||||
u'Id': u'PREPAID_10'
|
||||
}
|
||||
]
|
||||
*/
|
||||
|
||||
func (apierSv1 *APIerSv1) GetScheduledActions(args *scheduler.ArgsGetScheduledActions, reply *[]*scheduler.ScheduledAction) error {
|
||||
sched := apierSv1.SchedulerService.GetScheduler()
|
||||
if sched == nil {
|
||||
return errors.New(utils.SchedulerNotRunningCaps)
|
||||
}
|
||||
rpl := sched.GetScheduledActions(*args)
|
||||
if len(rpl) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
*reply = rpl
|
||||
return nil
|
||||
}
|
||||
@@ -1,182 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package v1
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// NewSchedulerSv1 retuns the API for SchedulerS
|
||||
func NewSchedulerSv1(cgrcfg *config.CGRConfig, dm *engine.DataManager) *SchedulerSv1 {
|
||||
return &SchedulerSv1{cgrcfg: cgrcfg, dm: dm}
|
||||
}
|
||||
|
||||
// SchedulerSv1 is the RPC object implementing scheduler APIs
|
||||
type SchedulerSv1 struct {
|
||||
cgrcfg *config.CGRConfig
|
||||
dm *engine.DataManager
|
||||
}
|
||||
|
||||
// Reload reloads scheduler instructions
|
||||
func (schdSv1 *SchedulerSv1) Reload(arg *utils.CGREvent, reply *string) error {
|
||||
schdSv1.cgrcfg.GetReloadChan(config.SCHEDULER_JSN) <- struct{}{}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExecuteActions execute an actionPlan or multiple actionsPlans between a time interval
|
||||
func (schdSv1 *SchedulerSv1) ExecuteActions(attr *utils.AttrsExecuteActions, reply *string) error {
|
||||
if attr.ActionPlanID != utils.EmptyString { // execute by ActionPlanID
|
||||
apl, err := schdSv1.dm.GetActionPlan(attr.ActionPlanID, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
return err
|
||||
}
|
||||
if apl != nil {
|
||||
// order by weight
|
||||
engine.ActionTimingWeightOnlyPriorityList(apl.ActionTimings).Sort()
|
||||
for _, at := range apl.ActionTimings {
|
||||
if at.IsASAP() {
|
||||
continue
|
||||
}
|
||||
|
||||
at.SetAccountIDs(apl.AccountIDs) // copy the accounts
|
||||
at.SetActionPlanID(apl.Id)
|
||||
err := at.Execute(nil, nil)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
return err
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<Force Scheduler> Executing action %s ", at.ActionsID))
|
||||
}
|
||||
}
|
||||
}
|
||||
if !attr.TimeStart.IsZero() && !attr.TimeEnd.IsZero() { // execute between two dates
|
||||
actionPlans, err := schdSv1.dm.GetAllActionPlans()
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
err := fmt.Errorf("cannot get action plans: %v", err)
|
||||
*reply = err.Error()
|
||||
return err
|
||||
}
|
||||
|
||||
// recreate the queue
|
||||
queue := engine.ActionTimingPriorityList{}
|
||||
for _, actionPlan := range actionPlans {
|
||||
for _, at := range actionPlan.ActionTimings {
|
||||
if at.Timing == nil {
|
||||
continue
|
||||
}
|
||||
if at.IsASAP() {
|
||||
continue
|
||||
}
|
||||
if at.GetNextStartTime(attr.TimeStart).Before(attr.TimeStart) {
|
||||
// the task is obsolete, do not add it to the queue
|
||||
continue
|
||||
}
|
||||
at.SetAccountIDs(actionPlan.AccountIDs) // copy the accounts
|
||||
at.SetActionPlanID(actionPlan.Id)
|
||||
at.ResetStartTimeCache()
|
||||
queue = append(queue, at)
|
||||
}
|
||||
}
|
||||
sort.Sort(queue)
|
||||
// start playback execution loop
|
||||
current := attr.TimeStart
|
||||
for len(queue) > 0 && current.Before(attr.TimeEnd) {
|
||||
a0 := queue[0]
|
||||
current = a0.GetNextStartTime(current)
|
||||
if current.Before(attr.TimeEnd) || current.Equal(attr.TimeEnd) {
|
||||
utils.Logger.Info(fmt.Sprintf("<Replay Scheduler> Executing action %s for time %v", a0.ActionsID, current))
|
||||
err := a0.Execute(nil, nil)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
return err
|
||||
}
|
||||
// if after execute the next start time is in the past then
|
||||
// do not add it to the queue
|
||||
a0.ResetStartTimeCache()
|
||||
current = current.Add(time.Second)
|
||||
start := a0.GetNextStartTime(current)
|
||||
if start.Before(current) || start.After(attr.TimeEnd) {
|
||||
queue = queue[1:]
|
||||
} else {
|
||||
queue = append(queue, a0)
|
||||
queue = queue[1:]
|
||||
sort.Sort(queue)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExecuteActionPlans execute multiple actionPlans one by one
|
||||
func (schdSv1 *SchedulerSv1) ExecuteActionPlans(attr *utils.AttrsExecuteActionPlans, reply *string) (err error) {
|
||||
// try get account
|
||||
// if not exist set in DM
|
||||
accID := utils.ConcatenatedKey(attr.Tenant, attr.AccountID)
|
||||
if _, err = schdSv1.dm.GetAccount(accID); err != nil {
|
||||
// create account if does not exist
|
||||
account := &engine.Account{
|
||||
ID: accID,
|
||||
}
|
||||
if err = schdSv1.dm.SetAccount(account); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, apID := range attr.ActionPlanIDs {
|
||||
apl, err := schdSv1.dm.GetActionPlan(apID, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
return err
|
||||
}
|
||||
if apl != nil {
|
||||
// order by weight
|
||||
engine.ActionTimingWeightOnlyPriorityList(apl.ActionTimings).Sort()
|
||||
for _, at := range apl.ActionTimings {
|
||||
at.SetAccountIDs(utils.NewStringMap(accID))
|
||||
err := at.Execute(nil, nil)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
return err
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<Force Scheduler> Executing action %s ", at.ActionsID))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ping returns Pong
|
||||
func (schdSv1 *SchedulerSv1) Ping(ign *utils.CGREvent, reply *string) error {
|
||||
*reply = utils.Pong
|
||||
return nil
|
||||
}
|
||||
|
||||
// Call implements rpcclient.ClientConnector interface for internal RPC
|
||||
func (schdSv1 *SchedulerSv1) Call(serviceMethod string,
|
||||
args interface{}, reply interface{}) error {
|
||||
return utils.APIerRPCCall(schdSv1, serviceMethod, args, reply)
|
||||
}
|
||||
@@ -1,332 +0,0 @@
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package v1
|
||||
|
||||
import (
|
||||
"net/rpc"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
)
|
||||
|
||||
var (
|
||||
schedCfgPath string
|
||||
schedCfg *config.CGRConfig
|
||||
schedRpc *rpc.Client
|
||||
schedConfDIR string //run tests for specific configuration
|
||||
)
|
||||
|
||||
var sTestsSchedFiltered = []func(t *testing.T){
|
||||
testSchedLoadConfig,
|
||||
testSchedInitDataDb,
|
||||
testSchedResetStorDb,
|
||||
testSchedStartEngine,
|
||||
testSchedRpcConn,
|
||||
testSchedFromFolder,
|
||||
testSchedVeifyAllAccounts,
|
||||
testSchedVeifyAccount1001,
|
||||
testSchedVeifyAccount1002and1003,
|
||||
testSchedExecuteAction,
|
||||
testSchedStopEngine,
|
||||
}
|
||||
|
||||
//TestSchedWithoutFilters will execute action for all accounts
|
||||
func TestSchedWithoutFilters(t *testing.T) {
|
||||
switch *dbType {
|
||||
case utils.MetaInternal:
|
||||
schedConfDIR = "tutinternal"
|
||||
case utils.MetaMySQL:
|
||||
schedConfDIR = "tutmysql"
|
||||
case utils.MetaMongo:
|
||||
schedConfDIR = "tutmongo"
|
||||
case utils.MetaPostgres:
|
||||
t.SkipNow()
|
||||
default:
|
||||
t.Fatal("Unknown Database type")
|
||||
}
|
||||
|
||||
for _, stest := range sTestsSchedFiltered {
|
||||
t.Run(schedConfDIR, stest)
|
||||
}
|
||||
}
|
||||
|
||||
//TestSchedWithFiltersSingleAccount will execute actions only for account 1001
|
||||
func TestSchedWithFiltersSingleAccount(t *testing.T) {
|
||||
switch *dbType {
|
||||
case utils.MetaInternal:
|
||||
schedConfDIR = "filtered_scheduler_internal"
|
||||
case utils.MetaMySQL:
|
||||
schedConfDIR = "filtered_scheduler_mysql"
|
||||
case utils.MetaMongo:
|
||||
schedConfDIR = "filtered_scheduler_mongo"
|
||||
case utils.MetaPostgres:
|
||||
t.SkipNow()
|
||||
default:
|
||||
t.Fatal("Unknown Database type")
|
||||
}
|
||||
for _, stest := range sTestsSchedFiltered {
|
||||
t.Run(schedConfDIR, stest)
|
||||
}
|
||||
}
|
||||
|
||||
//TestSchedWithFilters2 will execute actions for accounts 1002 and 1003 ( 1001 will be skiped )
|
||||
func TestSchedWithFilters2(t *testing.T) {
|
||||
switch *dbType {
|
||||
case utils.MetaInternal:
|
||||
schedConfDIR = "filtered_scheduler2_internal"
|
||||
case utils.MetaMySQL:
|
||||
schedConfDIR = "filtered_scheduler2_mysql"
|
||||
case utils.MetaMongo:
|
||||
schedConfDIR = "filtered_scheduler2_mongo"
|
||||
case utils.MetaPostgres:
|
||||
t.SkipNow()
|
||||
default:
|
||||
t.Fatal("Unknown Database type")
|
||||
}
|
||||
for _, stest := range sTestsSchedFiltered {
|
||||
t.Run(schedConfDIR, stest)
|
||||
}
|
||||
}
|
||||
|
||||
func testSchedLoadConfig(t *testing.T) {
|
||||
var err error
|
||||
schedCfgPath = path.Join(*dataDir, "conf", "samples", schedConfDIR)
|
||||
if schedCfg, err = config.NewCGRConfigFromPath(schedCfgPath); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testSchedInitDataDb(t *testing.T) {
|
||||
if err := engine.InitDataDb(schedCfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testSchedResetStorDb(t *testing.T) {
|
||||
if err := engine.InitStorDb(schedCfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testSchedStartEngine(t *testing.T) {
|
||||
if _, err := engine.StopStartEngine(schedCfgPath, *waitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testSchedRpcConn(t *testing.T) {
|
||||
var err error
|
||||
schedRpc, err = newRPCClient(schedCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
|
||||
if err != nil {
|
||||
t.Fatal("Could not connect to rater: ", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func testSchedFromFolder(t *testing.T) {
|
||||
var reply string
|
||||
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")}
|
||||
if err := schedRpc.Call(utils.APIerSv1LoadTariffPlanFromFolder, attrs, &reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
func testSchedVeifyAllAccounts(t *testing.T) {
|
||||
if !(schedConfDIR == "tutinternal" || schedConfDIR == "tutmysql" || schedConfDIR == "tutmongo") {
|
||||
t.SkipNow()
|
||||
}
|
||||
|
||||
var acnt *engine.Account
|
||||
attrs := &utils.AttrGetAccount{
|
||||
Tenant: "cgrates.org",
|
||||
Account: "1001",
|
||||
}
|
||||
if err := schedRpc.Call(utils.APIerSv2GetAccount, attrs, &acnt); err != nil {
|
||||
t.Error(err)
|
||||
} else if rply := acnt.BalanceMap[utils.MetaMonetary].GetTotalValue(); rply != 10 {
|
||||
t.Errorf("Expecting: %v, received: %v",
|
||||
10, rply)
|
||||
}
|
||||
attrs = &utils.AttrGetAccount{
|
||||
Tenant: "cgrates.org",
|
||||
Account: "1002",
|
||||
}
|
||||
if err := schedRpc.Call(utils.APIerSv2GetAccount, attrs, &acnt); err != nil {
|
||||
t.Error(err)
|
||||
} else if rply := acnt.BalanceMap[utils.MetaMonetary].GetTotalValue(); rply != 10 {
|
||||
t.Errorf("Expecting: %v, received: %v",
|
||||
10, rply)
|
||||
}
|
||||
attrs = &utils.AttrGetAccount{
|
||||
Tenant: "cgrates.org",
|
||||
Account: "1003",
|
||||
}
|
||||
if err := schedRpc.Call(utils.APIerSv2GetAccount, attrs, &acnt); err != nil {
|
||||
t.Error(err)
|
||||
} else if rply := acnt.BalanceMap[utils.MetaMonetary].GetTotalValue(); rply != 10 {
|
||||
t.Errorf("Expecting: %v, received: %v",
|
||||
10, rply)
|
||||
}
|
||||
}
|
||||
|
||||
func testSchedVeifyAccount1001(t *testing.T) {
|
||||
if !(schedConfDIR == "filtered_scheduler_internal" || schedConfDIR == "filtered_scheduler_mysql" || schedConfDIR == "filtered_scheduler_mongo") {
|
||||
t.SkipNow()
|
||||
}
|
||||
var acnt *engine.Account
|
||||
attrs := &utils.AttrGetAccount{
|
||||
Tenant: "cgrates.org",
|
||||
Account: "1001",
|
||||
}
|
||||
if err := schedRpc.Call(utils.APIerSv2GetAccount, attrs, &acnt); err != nil {
|
||||
t.Error(err)
|
||||
} else if rply := acnt.BalanceMap[utils.MetaMonetary].GetTotalValue(); rply != 10 {
|
||||
t.Errorf("Expecting: %v, received: %v",
|
||||
10, rply)
|
||||
}
|
||||
|
||||
acnt = nil // in case of gob ( it doesn't update the empty fields)
|
||||
attrs = &utils.AttrGetAccount{
|
||||
Tenant: "cgrates.org",
|
||||
Account: "1002",
|
||||
}
|
||||
if err := schedRpc.Call(utils.APIerSv2GetAccount, attrs, &acnt); err != nil {
|
||||
t.Error(err)
|
||||
} else if lenBal := len(acnt.BalanceMap[utils.MetaMonetary]); lenBal != 0 {
|
||||
t.Errorf("Expecting: %v, received: %v",
|
||||
0, lenBal)
|
||||
}
|
||||
|
||||
attrs = &utils.AttrGetAccount{
|
||||
Tenant: "cgrates.org",
|
||||
Account: "1003",
|
||||
}
|
||||
if err := schedRpc.Call(utils.APIerSv2GetAccount, attrs, &acnt); err != nil {
|
||||
t.Error(err)
|
||||
} else if lenBal := len(acnt.BalanceMap[utils.MetaMonetary]); lenBal != 0 {
|
||||
t.Errorf("Expecting: %v, received: %v",
|
||||
0, lenBal)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func testSchedVeifyAccount1002and1003(t *testing.T) {
|
||||
if !(schedConfDIR == "filtered_scheduler2_internal" || schedConfDIR == "filtered_scheduler2_mysql" || schedConfDIR == "filtered_scheduler2_mongo") {
|
||||
t.SkipNow()
|
||||
}
|
||||
var acnt *engine.Account
|
||||
attrs := &utils.AttrGetAccount{
|
||||
Tenant: "cgrates.org",
|
||||
Account: "1001",
|
||||
}
|
||||
if err := schedRpc.Call(utils.APIerSv2GetAccount, attrs, &acnt); err != nil {
|
||||
t.Error(err)
|
||||
} else if lenBal := len(acnt.BalanceMap[utils.MetaMonetary]); lenBal != 0 {
|
||||
t.Errorf("Expecting: %v, received: %v",
|
||||
0, lenBal)
|
||||
}
|
||||
|
||||
attrs = &utils.AttrGetAccount{
|
||||
Tenant: "cgrates.org",
|
||||
Account: "1002",
|
||||
}
|
||||
if err := schedRpc.Call(utils.APIerSv2GetAccount, attrs, &acnt); err != nil {
|
||||
t.Error(err)
|
||||
} else if rply := acnt.BalanceMap[utils.MetaMonetary].GetTotalValue(); rply != 10 {
|
||||
t.Errorf("Expecting: %v, received: %v",
|
||||
10, rply)
|
||||
}
|
||||
|
||||
attrs = &utils.AttrGetAccount{
|
||||
Tenant: "cgrates.org",
|
||||
Account: "1003",
|
||||
}
|
||||
if err := schedRpc.Call(utils.APIerSv2GetAccount, attrs, &acnt); err != nil {
|
||||
t.Error(err)
|
||||
} else if rply := acnt.BalanceMap[utils.MetaMonetary].GetTotalValue(); rply != 10 {
|
||||
t.Errorf("Expecting: %v, received: %v",
|
||||
10, rply)
|
||||
}
|
||||
}
|
||||
|
||||
func testSchedExecuteAction(t *testing.T) {
|
||||
if !(schedConfDIR == "tutinternal" || schedConfDIR == "tutmysql" || schedConfDIR == "tutmongo") {
|
||||
t.SkipNow()
|
||||
}
|
||||
// set a new ActionPlan
|
||||
var reply1 string
|
||||
if err := schedRpc.Call(utils.APIerSv1SetActionPlan, &AttrSetActionPlan{
|
||||
Id: "CustomAP",
|
||||
ActionPlan: []*AttrActionPlan{
|
||||
{
|
||||
ActionsId: "ACT_TOPUP_RST_10",
|
||||
Time: utils.MetaHourly,
|
||||
Weight: 20.0},
|
||||
},
|
||||
}, &reply1); err != nil {
|
||||
t.Error("Got error on APIerSv1.SetActionPlan: ", err.Error())
|
||||
} else if reply1 != utils.OK {
|
||||
t.Errorf("Unexpected reply returned: %s", reply1)
|
||||
}
|
||||
var reply string
|
||||
if err := schedRpc.Call(utils.APIerSv1SetAccount, utils.AttrSetAccount{
|
||||
Tenant: "cgrates.org",
|
||||
Account: "CustomAccount",
|
||||
ActionPlanID: "CustomAP",
|
||||
}, &reply); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var acnt *engine.Account
|
||||
attrs := &utils.AttrGetAccount{
|
||||
Tenant: "cgrates.org",
|
||||
Account: "CustomAccount",
|
||||
}
|
||||
expected := 0.0
|
||||
if err := schedRpc.Call(utils.APIerSv2GetAccount, attrs, &acnt); err != nil {
|
||||
t.Error(err)
|
||||
} else if rply := acnt.BalanceMap[utils.MetaMonetary].GetTotalValue(); rply != expected {
|
||||
t.Errorf("Expecting: %v, received: %v",
|
||||
expected, rply)
|
||||
}
|
||||
|
||||
if err := schedRpc.Call(utils.SchedulerSv1ExecuteActions, &utils.AttrsExecuteActions{ActionPlanID: "CustomAP"}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
expected = 10.0
|
||||
if err := schedRpc.Call(utils.APIerSv2GetAccount, attrs, &acnt); err != nil {
|
||||
t.Error(err)
|
||||
} else if rply := acnt.BalanceMap[utils.MetaMonetary].GetTotalValue(); rply != expected {
|
||||
t.Errorf("Expecting: %v, received: %v",
|
||||
expected, rply)
|
||||
}
|
||||
}
|
||||
|
||||
func testSchedStopEngine(t *testing.T) {
|
||||
if err := engine.KillEngine(*waitRater); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
@@ -56,7 +56,7 @@ func (apiv2 *APIerSv2) LoadRatingProfile(attrs *AttrLoadRatingProfile, reply *st
|
||||
}
|
||||
dbReader, err := engine.NewTpReader(apiv2.DataManager.DataDB(), apiv2.StorDb,
|
||||
attrs.TPid, apiv2.Config.GeneralCfg().DefaultTimezone,
|
||||
apiv2.Config.ApierCfg().CachesConns, apiv2.Config.ApierCfg().SchedulerConns,
|
||||
apiv2.Config.ApierCfg().CachesConns, apiv2.Config.ApierCfg().ActionConns,
|
||||
apiv2.Config.DataDbCfg().Type == utils.INTERNAL)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -86,7 +86,7 @@ func (apiv2 *APIerSv2) LoadAccountActions(attrs *AttrLoadAccountActions, reply *
|
||||
}
|
||||
dbReader, err := engine.NewTpReader(apiv2.DataManager.DataDB(), apiv2.StorDb,
|
||||
attrs.TPid, apiv2.Config.GeneralCfg().DefaultTimezone,
|
||||
apiv2.Config.ApierCfg().CachesConns, apiv2.Config.ApierCfg().SchedulerConns,
|
||||
apiv2.Config.ApierCfg().CachesConns, apiv2.Config.ApierCfg().ActionConns,
|
||||
apiv2.Config.DataDbCfg().Type == utils.INTERNAL)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -98,9 +98,9 @@ func (apiv2 *APIerSv2) LoadAccountActions(attrs *AttrLoadAccountActions, reply *
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, attrs.AccountActionsId); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
sched := apiv2.SchedulerService.GetScheduler()
|
||||
if sched != nil {
|
||||
sched.Reload()
|
||||
acts := apiv2.ActionService.GetAction()
|
||||
if acts != nil {
|
||||
//acts.Reload()
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
@@ -120,7 +120,7 @@ func (apiv2 *APIerSv2) LoadTariffPlanFromFolder(attrs *utils.AttrLoadTpFromFolde
|
||||
}
|
||||
loader, err := engine.NewTpReader(apiv2.DataManager.DataDB(),
|
||||
engine.NewFileCSVStorage(utils.CSVSep, attrs.FolderPath), "", apiv2.Config.GeneralCfg().DefaultTimezone,
|
||||
apiv2.Config.ApierCfg().CachesConns, apiv2.Config.ApierCfg().SchedulerConns,
|
||||
apiv2.Config.ApierCfg().CachesConns, apiv2.Config.ApierCfg().ActionConns,
|
||||
apiv2.Config.DataDbCfg().Type == utils.INTERNAL)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -152,7 +152,7 @@ func (apiv2 *APIerSv2) LoadTariffPlanFromFolder(attrs *utils.AttrLoadTpFromFolde
|
||||
if err := loader.ReloadCache(caching, true, attrs.APIOpts); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if len(apiv2.Config.ApierCfg().SchedulerConns) != 0 {
|
||||
if len(apiv2.Config.ApierCfg().ActionConns) != 0 {
|
||||
utils.Logger.Info("APIerSv2.LoadTariffPlanFromFolder, reloading scheduler.")
|
||||
if err := loader.ReloadScheduler(true); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
|
||||
@@ -644,9 +644,6 @@ func main() {
|
||||
routeS := services.NewRouteService(cfg, dmService, cacheS, filterSChan, server,
|
||||
internalRouteSChan, connManager, anz, srvDep)
|
||||
|
||||
schS := services.NewSchedulerService(cfg, dmService, cacheS, filterSChan,
|
||||
server, internalSchedulerSChan, connManager, anz, srvDep)
|
||||
|
||||
rals := services.NewRalService(cfg, cacheS, server,
|
||||
internalRALsChan, internalResponderChan,
|
||||
shdChan, connManager, anz, srvDep)
|
||||
|
||||
@@ -26,7 +26,7 @@ import (
|
||||
type ApierCfg struct {
|
||||
Enabled bool
|
||||
CachesConns []string // connections towards Cache
|
||||
SchedulerConns []string // connections towards Scheduler
|
||||
ActionConns []string // connections towards Scheduler
|
||||
AttributeSConns []string // connections towards AttributeS
|
||||
EEsConns []string // connections towards EEs
|
||||
}
|
||||
@@ -49,12 +49,12 @@ func (aCfg *ApierCfg) loadFromJSONCfg(jsnCfg *ApierJsonCfg) (err error) {
|
||||
}
|
||||
}
|
||||
if jsnCfg.Scheduler_conns != nil {
|
||||
aCfg.SchedulerConns = make([]string, len(*jsnCfg.Scheduler_conns))
|
||||
aCfg.ActionConns = make([]string, len(*jsnCfg.Scheduler_conns))
|
||||
for idx, conn := range *jsnCfg.Scheduler_conns {
|
||||
// if we have the connection internal we change the name so we can have internal rpc for each subsystem
|
||||
aCfg.SchedulerConns[idx] = conn
|
||||
aCfg.ActionConns[idx] = conn
|
||||
if conn == utils.MetaInternal {
|
||||
aCfg.SchedulerConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaScheduler)
|
||||
aCfg.ActionConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaScheduler)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -96,9 +96,9 @@ func (aCfg *ApierCfg) AsMapInterface() (initialMap map[string]interface{}) {
|
||||
}
|
||||
initialMap[utils.CachesConnsCfg] = cachesConns
|
||||
}
|
||||
if aCfg.SchedulerConns != nil {
|
||||
schedulerConns := make([]string, len(aCfg.SchedulerConns))
|
||||
for i, item := range aCfg.SchedulerConns {
|
||||
if aCfg.ActionConns != nil {
|
||||
schedulerConns := make([]string, len(aCfg.ActionConns))
|
||||
for i, item := range aCfg.ActionConns {
|
||||
schedulerConns[i] = item
|
||||
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaScheduler) {
|
||||
schedulerConns[i] = utils.MetaInternal
|
||||
@@ -140,10 +140,10 @@ func (aCfg ApierCfg) Clone() (cln *ApierCfg) {
|
||||
cln.CachesConns[i] = k
|
||||
}
|
||||
}
|
||||
if aCfg.SchedulerConns != nil {
|
||||
cln.SchedulerConns = make([]string, len(aCfg.SchedulerConns))
|
||||
for i, k := range aCfg.SchedulerConns {
|
||||
cln.SchedulerConns[i] = k
|
||||
if aCfg.ActionConns != nil {
|
||||
cln.ActionConns = make([]string, len(aCfg.ActionConns))
|
||||
for i, k := range aCfg.ActionConns {
|
||||
cln.ActionConns[i] = k
|
||||
}
|
||||
}
|
||||
if aCfg.AttributeSConns != nil {
|
||||
|
||||
@@ -162,7 +162,6 @@ func newCGRConfig(config []byte) (cfg *CGRConfig, err error) {
|
||||
cfg.ralsCfg = new(RalsCfg)
|
||||
cfg.ralsCfg.MaxComputedUsage = make(map[string]time.Duration)
|
||||
cfg.ralsCfg.BalanceRatingSubject = make(map[string]string)
|
||||
cfg.schedulerCfg = new(SchedulerCfg)
|
||||
cfg.cdrsCfg = new(CdrsCfg)
|
||||
cfg.analyzerSCfg = new(AnalyzerSCfg)
|
||||
cfg.sessionSCfg = new(SessionSCfg)
|
||||
@@ -308,7 +307,6 @@ type CGRConfig struct {
|
||||
httpCfg *HTTPCfg // HTTP config
|
||||
filterSCfg *FilterSCfg // FilterS config
|
||||
ralsCfg *RalsCfg // Rals config
|
||||
schedulerCfg *SchedulerCfg // Scheduler config
|
||||
cdrsCfg *CdrsCfg // Cdrs config
|
||||
sessionSCfg *SessionSCfg // SessionS config
|
||||
fsAgentCfg *FsAgentCfg // FreeSWITCHAgent config
|
||||
@@ -399,7 +397,7 @@ func (cfg *CGRConfig) loadFromJSONCfg(jsnCfg *CgrJsonCfg) (err error) {
|
||||
cfg.loadRPCConns,
|
||||
cfg.loadGeneralCfg, cfg.loadTemplateSCfg, cfg.loadCacheCfg, cfg.loadListenCfg,
|
||||
cfg.loadHTTPCfg, cfg.loadDataDBCfg, cfg.loadStorDBCfg,
|
||||
cfg.loadFilterSCfg, cfg.loadRalSCfg, cfg.loadSchedulerCfg,
|
||||
cfg.loadFilterSCfg, cfg.loadRalSCfg,
|
||||
cfg.loadCdrsCfg, cfg.loadSessionSCfg,
|
||||
cfg.loadFreeswitchAgentCfg, cfg.loadKamAgentCfg,
|
||||
cfg.loadAsteriskAgentCfg, cfg.loadDiameterAgentCfg, cfg.loadRadiusAgentCfg,
|
||||
@@ -539,15 +537,6 @@ func (cfg *CGRConfig) loadRalSCfg(jsnCfg *CgrJsonCfg) (err error) {
|
||||
return cfg.ralsCfg.loadFromJSONCfg(jsnRALsCfg)
|
||||
}
|
||||
|
||||
// loadSchedulerCfg loads the Scheduler section of the configuration
|
||||
func (cfg *CGRConfig) loadSchedulerCfg(jsnCfg *CgrJsonCfg) (err error) {
|
||||
var jsnSchedCfg *SchedulerJsonCfg
|
||||
if jsnSchedCfg, err = jsnCfg.SchedulerJsonCfg(); err != nil {
|
||||
return
|
||||
}
|
||||
return cfg.schedulerCfg.loadFromJSONCfg(jsnSchedCfg)
|
||||
}
|
||||
|
||||
// loadCdrsCfg loads the Cdrs section of the configuration
|
||||
func (cfg *CGRConfig) loadCdrsCfg(jsnCfg *CgrJsonCfg) (err error) {
|
||||
var jsnCdrsCfg *CdrsJsonCfg
|
||||
@@ -1029,13 +1018,6 @@ func (cfg *CGRConfig) MigratorCgrCfg() *MigratorCgrCfg {
|
||||
return cfg.migratorCgrCfg
|
||||
}
|
||||
|
||||
// SchedulerCfg returns the config for Scheduler
|
||||
func (cfg *CGRConfig) SchedulerCfg() *SchedulerCfg {
|
||||
cfg.lks[SCHEDULER_JSN].Lock()
|
||||
defer cfg.lks[SCHEDULER_JSN].Unlock()
|
||||
return cfg.schedulerCfg
|
||||
}
|
||||
|
||||
// DataDbCfg returns the config for DataDb
|
||||
func (cfg *CGRConfig) DataDbCfg() *DataDbCfg {
|
||||
cfg.lks[DATADB_JSN].Lock()
|
||||
@@ -1270,7 +1252,6 @@ func (cfg *CGRConfig) getLoadFunctions() map[string]func(*CgrJsonCfg) error {
|
||||
LISTEN_JSN: cfg.loadListenCfg,
|
||||
TlsCfgJson: cfg.loadTLSCgrCfg,
|
||||
HTTP_JSN: cfg.loadHTTPCfg,
|
||||
SCHEDULER_JSN: cfg.loadSchedulerCfg,
|
||||
CACHE_JSN: cfg.loadCacheCfg,
|
||||
FilterSjsn: cfg.loadFilterSCfg,
|
||||
RALS_JSN: cfg.loadRalSCfg,
|
||||
@@ -1467,7 +1448,7 @@ func (cfg *CGRConfig) loadCfgFromJSONWithLocks(rdr io.Reader, sections []string)
|
||||
// reloadSections sends a signal to the reload channel for the needed sections
|
||||
// the list of sections should be always valid because we load the config first with this list
|
||||
func (cfg *CGRConfig) reloadSections(sections ...string) {
|
||||
subsystemsThatNeedDataDB := utils.NewStringSet([]string{DATADB_JSN, SCHEDULER_JSN,
|
||||
subsystemsThatNeedDataDB := utils.NewStringSet([]string{DATADB_JSN,
|
||||
RALS_JSN, CDRS_JSN, SessionSJson, ATTRIBUTE_JSN,
|
||||
ChargerSCfgJson, RESOURCES_JSON, STATS_JSON, THRESHOLDS_JSON,
|
||||
RouteSJson, LoaderJson, DispatcherSJson, RateSJson, ApierS, AccountSCfgJson,
|
||||
@@ -1510,8 +1491,6 @@ func (cfg *CGRConfig) reloadSections(sections ...string) {
|
||||
case CoreSCfgJson: // nothing to reload
|
||||
case HTTP_JSN:
|
||||
cfg.rldChans[HTTP_JSN] <- struct{}{}
|
||||
case SCHEDULER_JSN:
|
||||
cfg.rldChans[SCHEDULER_JSN] <- struct{}{}
|
||||
case RALS_JSN:
|
||||
cfg.rldChans[RALS_JSN] <- struct{}{}
|
||||
case CDRS_JSN:
|
||||
@@ -1585,7 +1564,6 @@ func (cfg *CGRConfig) AsMapInterface(separator string) (mp map[string]interface{
|
||||
HTTP_JSN: cfg.httpCfg.AsMapInterface(),
|
||||
FilterSjsn: cfg.filterSCfg.AsMapInterface(),
|
||||
RALS_JSN: cfg.ralsCfg.AsMapInterface(),
|
||||
SCHEDULER_JSN: cfg.schedulerCfg.AsMapInterface(),
|
||||
CDRS_JSN: cfg.cdrsCfg.AsMapInterface(),
|
||||
SessionSJson: cfg.sessionSCfg.AsMapInterface(),
|
||||
FreeSWITCHAgentJSN: cfg.fsAgentCfg.AsMapInterface(separator),
|
||||
@@ -1712,8 +1690,6 @@ func (cfg *CGRConfig) V1GetConfig(args *SectionWithAPIOpts, reply *map[string]in
|
||||
mp = cfg.FilterSCfg().AsMapInterface()
|
||||
case RALS_JSN:
|
||||
mp = cfg.RalsCfg().AsMapInterface()
|
||||
case SCHEDULER_JSN:
|
||||
mp = cfg.SchedulerCfg().AsMapInterface()
|
||||
case CDRS_JSN:
|
||||
mp = cfg.CdrsCfg().AsMapInterface()
|
||||
case SessionSJson:
|
||||
@@ -1882,8 +1858,6 @@ func (cfg *CGRConfig) V1GetConfigAsJSON(args *SectionWithAPIOpts, reply *string)
|
||||
mp = cfg.FilterSCfg().AsMapInterface()
|
||||
case RALS_JSN:
|
||||
mp = cfg.RalsCfg().AsMapInterface()
|
||||
case SCHEDULER_JSN:
|
||||
mp = cfg.SchedulerCfg().AsMapInterface()
|
||||
case CDRS_JSN:
|
||||
mp = cfg.CdrsCfg().AsMapInterface()
|
||||
case SessionSJson:
|
||||
@@ -2019,7 +1993,6 @@ func (cfg *CGRConfig) Clone() (cln *CGRConfig) {
|
||||
httpCfg: cfg.httpCfg.Clone(),
|
||||
filterSCfg: cfg.filterSCfg.Clone(),
|
||||
ralsCfg: cfg.ralsCfg.Clone(),
|
||||
schedulerCfg: cfg.schedulerCfg.Clone(),
|
||||
cdrsCfg: cfg.cdrsCfg.Clone(),
|
||||
sessionSCfg: cfg.sessionSCfg.Clone(),
|
||||
fsAgentCfg: cfg.fsAgentCfg.Clone(),
|
||||
|
||||
@@ -224,16 +224,6 @@ const CGRATES_CFG_JSON = `
|
||||
},
|
||||
},
|
||||
|
||||
|
||||
"schedulers": {
|
||||
"enabled": false, // start Scheduler service: <true|false>
|
||||
"cdrs_conns": [], // connections to CDRs for *cdrlog actions <""|*internal|$rpc_conns_id>
|
||||
"thresholds_conns": [], // connections to ThresholdS for *reset_threshold action <""|*internal|$rpc_conns_id>
|
||||
"stats_conns": [], // connections to StatS for *reset_stat_queue action: <""|*internal|$rpc_conns_id>
|
||||
"filters": [], // only execute actions matching these filters
|
||||
},
|
||||
|
||||
|
||||
"caches":{
|
||||
"partitions": {
|
||||
"*destinations": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // destination caching
|
||||
|
||||
@@ -31,7 +31,6 @@ const (
|
||||
STORDB_JSN = "stor_db"
|
||||
FilterSjsn = "filters"
|
||||
RALS_JSN = "rals"
|
||||
SCHEDULER_JSN = "schedulers"
|
||||
CDRS_JSN = "cdrs"
|
||||
SessionSJson = "sessions"
|
||||
FreeSWITCHAgentJSN = "freeswitch_agent"
|
||||
@@ -71,7 +70,7 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
sortedCfgSections = []string{GENERAL_JSN, RPCConnsJsonName, DATADB_JSN, STORDB_JSN, LISTEN_JSN, TlsCfgJson, HTTP_JSN, SCHEDULER_JSN,
|
||||
sortedCfgSections = []string{GENERAL_JSN, RPCConnsJsonName, DATADB_JSN, STORDB_JSN, LISTEN_JSN, TlsCfgJson, HTTP_JSN,
|
||||
CACHE_JSN, FilterSjsn, RALS_JSN, CDRS_JSN, ERsJson, SessionSJson, AsteriskAgentJSN, FreeSWITCHAgentJSN,
|
||||
KamailioAgentJSN, DA_JSN, RA_JSN, HttpAgentJson, DNSAgentJson, ATTRIBUTE_JSN, ChargerSCfgJson, RESOURCES_JSON, STATS_JSON,
|
||||
THRESHOLDS_JSON, RouteSJson, LoaderJson, MAILER_JSN, SURETAX_JSON, CgrLoaderCfgJson, CgrMigratorCfgJson, DispatcherSJson,
|
||||
@@ -185,18 +184,6 @@ func (jsnCfg CgrJsonCfg) RalsJsonCfg() (*RalsJsonCfg, error) {
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (jsnCfg CgrJsonCfg) SchedulerJsonCfg() (*SchedulerJsonCfg, error) {
|
||||
rawCfg, hasKey := jsnCfg[SCHEDULER_JSN]
|
||||
if !hasKey {
|
||||
return nil, nil
|
||||
}
|
||||
cfg := new(SchedulerJsonCfg)
|
||||
if err := json.Unmarshal(*rawCfg, cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (jsnCfg CgrJsonCfg) CdrsJsonCfg() (*CdrsJsonCfg, error) {
|
||||
rawCfg, hasKey := jsnCfg[CDRS_JSN]
|
||||
if !hasKey {
|
||||
|
||||
@@ -501,33 +501,6 @@ func (cfg *CGRConfig) checkConfigSanity() error {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Scheduler check connection with CDR Server
|
||||
if cfg.schedulerCfg.Enabled {
|
||||
for _, connID := range cfg.schedulerCfg.CDRsConns {
|
||||
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.cdrsCfg.Enabled {
|
||||
return fmt.Errorf("<%s> not enabled but requested by <%s> component", utils.CDRs, utils.SchedulerS)
|
||||
}
|
||||
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
|
||||
return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.SchedulerS, connID)
|
||||
}
|
||||
}
|
||||
for _, connID := range cfg.schedulerCfg.ThreshSConns {
|
||||
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.thresholdSCfg.Enabled {
|
||||
return fmt.Errorf("<%s> not enabled but requested by <%s> component", utils.ThresholdS, utils.SchedulerS)
|
||||
}
|
||||
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
|
||||
return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.SchedulerS, connID)
|
||||
}
|
||||
}
|
||||
for _, connID := range cfg.schedulerCfg.StatSConns {
|
||||
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.statsCfg.Enabled {
|
||||
return fmt.Errorf("<%s> not enabled but requested by <%s> component", utils.StatS, utils.SchedulerS)
|
||||
}
|
||||
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
|
||||
return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.SchedulerS, connID)
|
||||
}
|
||||
}
|
||||
}
|
||||
// EventReader sanity checks
|
||||
if cfg.ersCfg.Enabled {
|
||||
for _, connID := range cfg.ersCfg.SessionSConns {
|
||||
@@ -678,8 +651,8 @@ func (cfg *CGRConfig) checkConfigSanity() error {
|
||||
return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.APIerSv1, connID)
|
||||
}
|
||||
}
|
||||
for _, connID := range cfg.apier.SchedulerConns {
|
||||
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.schedulerCfg.Enabled {
|
||||
for _, connID := range cfg.apier.ActionConns {
|
||||
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.actionSCfg.Enabled {
|
||||
return fmt.Errorf("<%s> not enabled but requested by <%s> component", utils.SchedulerS, utils.APIerSv1)
|
||||
}
|
||||
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
|
||||
|
||||
@@ -129,15 +129,6 @@ type RalsJsonCfg struct {
|
||||
Dynaprepaid_actionplans *[]string
|
||||
}
|
||||
|
||||
// Scheduler config section
|
||||
type SchedulerJsonCfg struct {
|
||||
Enabled *bool
|
||||
Cdrs_conns *[]string
|
||||
Thresholds_conns *[]string
|
||||
Stats_conns *[]string
|
||||
Filters *[]string
|
||||
}
|
||||
|
||||
// Cdrs config section
|
||||
type CdrsJsonCfg struct {
|
||||
Enabled *bool
|
||||
|
||||
@@ -1,150 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package config
|
||||
|
||||
import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// SchedulerCfg the condig section for scheduler
|
||||
type SchedulerCfg struct {
|
||||
Enabled bool
|
||||
CDRsConns []string
|
||||
ThreshSConns []string
|
||||
StatSConns []string
|
||||
Filters []string
|
||||
}
|
||||
|
||||
func (schdcfg *SchedulerCfg) loadFromJSONCfg(jsnCfg *SchedulerJsonCfg) error {
|
||||
if jsnCfg == nil {
|
||||
return nil
|
||||
}
|
||||
if jsnCfg.Enabled != nil {
|
||||
schdcfg.Enabled = *jsnCfg.Enabled
|
||||
}
|
||||
if jsnCfg.Cdrs_conns != nil {
|
||||
schdcfg.CDRsConns = make([]string, len(*jsnCfg.Cdrs_conns))
|
||||
for idx, conn := range *jsnCfg.Cdrs_conns {
|
||||
// if we have the connection internal we change the name so we can have internal rpc for each subsystem
|
||||
schdcfg.CDRsConns[idx] = conn
|
||||
if conn == utils.MetaInternal {
|
||||
schdcfg.CDRsConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs)
|
||||
}
|
||||
}
|
||||
}
|
||||
if jsnCfg.Filters != nil {
|
||||
schdcfg.Filters = make([]string, len(*jsnCfg.Filters))
|
||||
for i, fltr := range *jsnCfg.Filters {
|
||||
schdcfg.Filters[i] = fltr
|
||||
}
|
||||
}
|
||||
if jsnCfg.Thresholds_conns != nil {
|
||||
schdcfg.ThreshSConns = make([]string, len(*jsnCfg.Thresholds_conns))
|
||||
for idx, connID := range *jsnCfg.Thresholds_conns {
|
||||
// if we have the connection internal we change the name so we can have internal rpc for each subsystem
|
||||
schdcfg.ThreshSConns[idx] = connID
|
||||
if connID == utils.MetaInternal {
|
||||
schdcfg.ThreshSConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds)
|
||||
}
|
||||
}
|
||||
}
|
||||
if jsnCfg.Stats_conns != nil {
|
||||
schdcfg.StatSConns = make([]string, len(*jsnCfg.Stats_conns))
|
||||
for idx, connID := range *jsnCfg.Stats_conns {
|
||||
// if we have the connection internal we change the name so we can have internal rpc for each subsystem
|
||||
schdcfg.StatSConns[idx] = connID
|
||||
if connID == utils.MetaInternal {
|
||||
schdcfg.StatSConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AsMapInterface returns the config as a map[string]interface{}
|
||||
func (schdcfg *SchedulerCfg) AsMapInterface() (initialMP map[string]interface{}) {
|
||||
initialMP = map[string]interface{}{
|
||||
utils.EnabledCfg: schdcfg.Enabled,
|
||||
utils.FiltersCfg: schdcfg.Filters,
|
||||
}
|
||||
if schdcfg.CDRsConns != nil {
|
||||
cdrsConns := make([]string, len(schdcfg.CDRsConns))
|
||||
for i, item := range schdcfg.CDRsConns {
|
||||
cdrsConns[i] = item
|
||||
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs) {
|
||||
cdrsConns[i] = utils.MetaInternal
|
||||
}
|
||||
}
|
||||
initialMP[utils.CDRsConnsCfg] = cdrsConns
|
||||
}
|
||||
if schdcfg.ThreshSConns != nil {
|
||||
thrsConns := make([]string, len(schdcfg.ThreshSConns))
|
||||
for i, item := range schdcfg.ThreshSConns {
|
||||
thrsConns[i] = item
|
||||
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds) {
|
||||
thrsConns[i] = utils.MetaInternal
|
||||
}
|
||||
}
|
||||
initialMP[utils.ThreshSConnsCfg] = thrsConns
|
||||
}
|
||||
if schdcfg.StatSConns != nil {
|
||||
stsConns := make([]string, len(schdcfg.StatSConns))
|
||||
for i, item := range schdcfg.StatSConns {
|
||||
stsConns[i] = item
|
||||
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats) {
|
||||
stsConns[i] = utils.MetaInternal
|
||||
}
|
||||
}
|
||||
initialMP[utils.StatSConnsCfg] = stsConns
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Clone returns a deep copy of SchedulerCfg
|
||||
func (schdcfg SchedulerCfg) Clone() (cln *SchedulerCfg) {
|
||||
cln = &SchedulerCfg{
|
||||
Enabled: schdcfg.Enabled,
|
||||
}
|
||||
if schdcfg.CDRsConns != nil {
|
||||
cln.CDRsConns = make([]string, len(schdcfg.CDRsConns))
|
||||
for i, con := range schdcfg.CDRsConns {
|
||||
cln.CDRsConns[i] = con
|
||||
}
|
||||
}
|
||||
if schdcfg.ThreshSConns != nil {
|
||||
cln.ThreshSConns = make([]string, len(schdcfg.ThreshSConns))
|
||||
for i, con := range schdcfg.ThreshSConns {
|
||||
cln.ThreshSConns[i] = con
|
||||
}
|
||||
}
|
||||
if schdcfg.StatSConns != nil {
|
||||
cln.StatSConns = make([]string, len(schdcfg.StatSConns))
|
||||
for i, con := range schdcfg.StatSConns {
|
||||
cln.StatSConns[i] = con
|
||||
}
|
||||
}
|
||||
if schdcfg.Filters != nil {
|
||||
cln.Filters = make([]string, len(schdcfg.Filters))
|
||||
for i, con := range schdcfg.Filters {
|
||||
cln.Filters[i] = con
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
@@ -1,118 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package config
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestSchedulerCfgloadFromJsonCfg(t *testing.T) {
|
||||
cfgJSONS := &SchedulerJsonCfg{
|
||||
Enabled: utils.BoolPointer(true),
|
||||
Cdrs_conns: &[]string{utils.MetaInternal, "*conn1"},
|
||||
Thresholds_conns: &[]string{utils.MetaInternal, "*conn1"},
|
||||
Stats_conns: &[]string{utils.MetaInternal, "*conn1"},
|
||||
Filters: &[]string{"randomFilter"},
|
||||
}
|
||||
expected := &SchedulerCfg{
|
||||
Enabled: true,
|
||||
CDRsConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs), "*conn1"},
|
||||
ThreshSConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds), "*conn1"},
|
||||
StatSConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats), "*conn1"},
|
||||
Filters: []string{"randomFilter"},
|
||||
}
|
||||
jsonCfg := NewDefaultCGRConfig()
|
||||
if err = jsonCfg.schedulerCfg.loadFromJSONCfg(cfgJSONS); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(expected, jsonCfg.schedulerCfg) {
|
||||
t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(expected), utils.ToJSON(jsonCfg.schedulerCfg))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSchedulerCfgAsMapInterface(t *testing.T) {
|
||||
cfgJSONStr := `{
|
||||
"schedulers": {},
|
||||
}`
|
||||
eMap := map[string]interface{}{
|
||||
utils.EnabledCfg: false,
|
||||
utils.CDRsConnsCfg: []string{},
|
||||
utils.ThreshSConnsCfg: []string{},
|
||||
utils.StatSConnsCfg: []string{},
|
||||
utils.FiltersCfg: []string{},
|
||||
}
|
||||
if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil {
|
||||
t.Error(err)
|
||||
} else if rcv := cgrCfg.schedulerCfg.AsMapInterface(); !reflect.DeepEqual(eMap, rcv) {
|
||||
t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(eMap), utils.ToJSON(rcv))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestSchedulerCfgAsMapInterface1(t *testing.T) {
|
||||
cfgJSONStr := `{
|
||||
"schedulers": {
|
||||
"enabled": true,
|
||||
"cdrs_conns": ["*internal", "*conn1"],
|
||||
"thresholds_conns": ["*internal", "*conn1"],
|
||||
"stats_conns": ["*internal", "*conn1"],
|
||||
"filters": ["randomFilter"],
|
||||
},
|
||||
}`
|
||||
eMap := map[string]interface{}{
|
||||
utils.EnabledCfg: true,
|
||||
utils.CDRsConnsCfg: []string{utils.MetaInternal, "*conn1"},
|
||||
utils.ThreshSConnsCfg: []string{utils.MetaInternal, "*conn1"},
|
||||
utils.StatSConnsCfg: []string{utils.MetaInternal, "*conn1"},
|
||||
utils.FiltersCfg: []string{"randomFilter"},
|
||||
}
|
||||
if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil {
|
||||
t.Error(err)
|
||||
} else if rcv := cgrCfg.schedulerCfg.AsMapInterface(); !reflect.DeepEqual(eMap, rcv) {
|
||||
t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(eMap), utils.ToJSON(rcv))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestSchedulerCfgClone(t *testing.T) {
|
||||
ban := &SchedulerCfg{
|
||||
Enabled: true,
|
||||
CDRsConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs), "*conn1"},
|
||||
ThreshSConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds), "*conn1"},
|
||||
StatSConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats), "*conn1"},
|
||||
Filters: []string{"randomFilter"},
|
||||
}
|
||||
rcv := ban.Clone()
|
||||
if !reflect.DeepEqual(ban, rcv) {
|
||||
t.Errorf("Expected: %+v\nReceived: %+v", utils.ToJSON(ban), utils.ToJSON(rcv))
|
||||
}
|
||||
if rcv.CDRsConns[1] = ""; ban.CDRsConns[1] != "*conn1" {
|
||||
t.Errorf("Expected clone to not modify the cloned")
|
||||
}
|
||||
if rcv.ThreshSConns[1] = ""; ban.ThreshSConns[1] != "*conn1" {
|
||||
t.Errorf("Expected clone to not modify the cloned")
|
||||
}
|
||||
if rcv.StatSConns[1] = ""; ban.StatSConns[1] != "*conn1" {
|
||||
t.Errorf("Expected clone to not modify the cloned")
|
||||
}
|
||||
if rcv.Filters[0] = ""; ban.Filters[0] != "randomFilter" {
|
||||
t.Errorf("Expected clone to not modify the cloned")
|
||||
}
|
||||
}
|
||||
@@ -1,65 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package console
|
||||
|
||||
import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func init() {
|
||||
c := &CmdExecuteScheduledActions{
|
||||
name: "scheduler_execute",
|
||||
rpcMethod: utils.SchedulerSv1ExecuteActions,
|
||||
rpcParams: &utils.AttrsExecuteActions{},
|
||||
}
|
||||
commands[c.Name()] = c
|
||||
c.CommandExecuter = &CommandExecuter{c}
|
||||
}
|
||||
|
||||
// Commander implementation
|
||||
type CmdExecuteScheduledActions struct {
|
||||
name string
|
||||
rpcMethod string
|
||||
rpcParams *utils.AttrsExecuteActions
|
||||
*CommandExecuter
|
||||
}
|
||||
|
||||
func (self *CmdExecuteScheduledActions) Name() string {
|
||||
return self.name
|
||||
}
|
||||
|
||||
func (self *CmdExecuteScheduledActions) RpcMethod() string {
|
||||
return self.rpcMethod
|
||||
}
|
||||
|
||||
func (self *CmdExecuteScheduledActions) RpcParams(reset bool) interface{} {
|
||||
if reset || self.rpcParams == nil {
|
||||
self.rpcParams = &utils.AttrsExecuteActions{}
|
||||
}
|
||||
return self.rpcParams
|
||||
}
|
||||
|
||||
func (self *CmdExecuteScheduledActions) PostprocessRpcParams() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *CmdExecuteScheduledActions) RpcResult() interface{} {
|
||||
var s string
|
||||
return &s
|
||||
}
|
||||
@@ -1,54 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package console
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestCmdSchedulerExecute(t *testing.T) {
|
||||
// commands map is initiated in init function
|
||||
command := commands["scheduler_execute"]
|
||||
// verify if ApierSv1 object has method on it
|
||||
m, ok := reflect.TypeOf(new(v1.SchedulerSv1)).MethodByName(strings.Split(command.RpcMethod(), utils.NestingSep)[1])
|
||||
if !ok {
|
||||
t.Fatal("method not found")
|
||||
}
|
||||
if m.Type.NumIn() != 3 { // ApierSv1 is consider and we expect 3 inputs
|
||||
t.Fatalf("invalid number of input parameters ")
|
||||
}
|
||||
// verify the type of input parameter
|
||||
if ok := m.Type.In(1).AssignableTo(reflect.TypeOf(command.RpcParams(true))); !ok {
|
||||
t.Fatalf("cannot assign input parameter")
|
||||
}
|
||||
// verify the type of output parameter
|
||||
if ok := m.Type.In(2).AssignableTo(reflect.TypeOf(command.RpcResult())); !ok {
|
||||
t.Fatalf("cannot assign output parameter")
|
||||
}
|
||||
// for coverage purpose
|
||||
if err := command.PostprocessRpcParams(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
@@ -1,66 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package console
|
||||
|
||||
import (
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func init() {
|
||||
c := &CmdGetScheduledActions{
|
||||
name: "scheduler_queue",
|
||||
rpcMethod: utils.APIerSv1GetScheduledActions,
|
||||
rpcParams: &scheduler.ArgsGetScheduledActions{},
|
||||
}
|
||||
commands[c.Name()] = c
|
||||
c.CommandExecuter = &CommandExecuter{c}
|
||||
}
|
||||
|
||||
// Commander implementation
|
||||
type CmdGetScheduledActions struct {
|
||||
name string
|
||||
rpcMethod string
|
||||
rpcParams *scheduler.ArgsGetScheduledActions
|
||||
*CommandExecuter
|
||||
}
|
||||
|
||||
func (self *CmdGetScheduledActions) Name() string {
|
||||
return self.name
|
||||
}
|
||||
|
||||
func (self *CmdGetScheduledActions) RpcMethod() string {
|
||||
return self.rpcMethod
|
||||
}
|
||||
|
||||
func (self *CmdGetScheduledActions) RpcParams(reset bool) interface{} {
|
||||
if reset || self.rpcParams == nil {
|
||||
self.rpcParams = &scheduler.ArgsGetScheduledActions{}
|
||||
}
|
||||
return self.rpcParams
|
||||
}
|
||||
|
||||
func (self *CmdGetScheduledActions) PostprocessRpcParams() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *CmdGetScheduledActions) RpcResult() interface{} {
|
||||
s := make([]*scheduler.ScheduledAction, 0)
|
||||
return &s
|
||||
}
|
||||
@@ -1,54 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package console
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestCmdSchedulerQueue(t *testing.T) {
|
||||
// commands map is initiated in init function
|
||||
command := commands["scheduler_queue"]
|
||||
// verify if ApierSv1 object has method on it
|
||||
m, ok := reflect.TypeOf(new(v1.APIerSv1)).MethodByName(strings.Split(command.RpcMethod(), utils.NestingSep)[1])
|
||||
if !ok {
|
||||
t.Fatal("method not found")
|
||||
}
|
||||
if m.Type.NumIn() != 3 { // ApierSv1 is consider and we expect 3 inputs
|
||||
t.Fatalf("invalid number of input parameters ")
|
||||
}
|
||||
// verify the type of input parameter
|
||||
if ok := m.Type.In(1).AssignableTo(reflect.TypeOf(command.RpcParams(true))); !ok {
|
||||
t.Fatalf("cannot assign input parameter")
|
||||
}
|
||||
// verify the type of output parameter
|
||||
if ok := m.Type.In(2).AssignableTo(reflect.TypeOf(command.RpcResult())); !ok {
|
||||
t.Fatalf("cannot assign output parameter")
|
||||
}
|
||||
// for coverage purpose
|
||||
if err := command.PostprocessRpcParams(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
@@ -1,62 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package console
|
||||
|
||||
import "github.com/cgrates/cgrates/utils"
|
||||
|
||||
func init() {
|
||||
c := &CmdReloadScheduler{
|
||||
name: "scheduler_reload",
|
||||
rpcMethod: utils.SchedulerSv1Reload,
|
||||
}
|
||||
commands[c.Name()] = c
|
||||
c.CommandExecuter = &CommandExecuter{c}
|
||||
}
|
||||
|
||||
// Commander implementation
|
||||
type CmdReloadScheduler struct {
|
||||
name string
|
||||
rpcMethod string
|
||||
rpcParams *utils.CGREvent
|
||||
*CommandExecuter
|
||||
}
|
||||
|
||||
func (self *CmdReloadScheduler) Name() string {
|
||||
return self.name
|
||||
}
|
||||
|
||||
func (self *CmdReloadScheduler) RpcMethod() string {
|
||||
return self.rpcMethod
|
||||
}
|
||||
|
||||
func (self *CmdReloadScheduler) RpcParams(reset bool) interface{} {
|
||||
if reset || self.rpcParams == nil {
|
||||
self.rpcParams = &utils.CGREvent{}
|
||||
}
|
||||
return self.rpcParams
|
||||
}
|
||||
|
||||
func (self *CmdReloadScheduler) PostprocessRpcParams() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *CmdReloadScheduler) RpcResult() interface{} {
|
||||
var s string
|
||||
return &s
|
||||
}
|
||||
@@ -1,54 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package console
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestCmdSchedulerReload(t *testing.T) {
|
||||
// commands map is initiated in init function
|
||||
command := commands["scheduler_reload"]
|
||||
// verify if ApierSv1 object has method on it
|
||||
m, ok := reflect.TypeOf(new(v1.SchedulerSv1)).MethodByName(strings.Split(command.RpcMethod(), utils.NestingSep)[1])
|
||||
if !ok {
|
||||
t.Fatal("method not found")
|
||||
}
|
||||
if m.Type.NumIn() != 3 { // ApierSv1 is consider and we expect 3 inputs
|
||||
t.Fatalf("invalid number of input parameters ")
|
||||
}
|
||||
// verify the type of input parameter
|
||||
if ok := m.Type.In(1).AssignableTo(reflect.TypeOf(command.RpcParams(true))); !ok {
|
||||
t.Fatalf("cannot assign input parameter")
|
||||
}
|
||||
// verify the type of output parameter
|
||||
if ok := m.Type.In(2).AssignableTo(reflect.TypeOf(command.RpcResult())); !ok {
|
||||
t.Fatalf("cannot assign output parameter")
|
||||
}
|
||||
// for coverage purpose
|
||||
if err := command.PostprocessRpcParams(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
1268
engine/account.go
1268
engine/account.go
File diff suppressed because it is too large
Load Diff
1090
engine/action.go
1090
engine/action.go
File diff suppressed because it is too large
Load Diff
@@ -1,368 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/gorhill/cronexpr"
|
||||
)
|
||||
|
||||
const (
|
||||
FORMAT = "2006-1-2 15:04:05 MST"
|
||||
)
|
||||
|
||||
type ActionTiming struct {
|
||||
Uuid string
|
||||
Timing *RateInterval
|
||||
ActionsID string
|
||||
ExtraData interface{}
|
||||
Weight float64
|
||||
actions Actions
|
||||
accountIDs utils.StringMap // copy of action plans accounts
|
||||
actionPlanID string // the id of the belonging action plan (info only)
|
||||
stCache time.Time // cached time of the next start
|
||||
}
|
||||
|
||||
// Tasks converts an ActionTiming into multiple Tasks
|
||||
func (at *ActionTiming) Tasks() (tsks []*Task) {
|
||||
if len(at.accountIDs) == 0 {
|
||||
return []*Task{{
|
||||
Uuid: at.Uuid,
|
||||
ActionsID: at.ActionsID,
|
||||
}}
|
||||
}
|
||||
tsks = make([]*Task, len(at.accountIDs))
|
||||
i := 0
|
||||
for acntID := range at.accountIDs {
|
||||
tsks[i] = &Task{
|
||||
Uuid: at.Uuid,
|
||||
ActionsID: at.ActionsID,
|
||||
AccountID: acntID,
|
||||
}
|
||||
i++
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type ActionPlan struct {
|
||||
Id string // informative purpose only
|
||||
AccountIDs utils.StringMap
|
||||
ActionTimings []*ActionTiming
|
||||
}
|
||||
|
||||
func (apl *ActionPlan) RemoveAccountID(accID string) (found bool) {
|
||||
if _, found = apl.AccountIDs[accID]; found {
|
||||
delete(apl.AccountIDs, accID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Clone clones *ActionPlan
|
||||
func (apl *ActionPlan) Clone() (interface{}, error) {
|
||||
cln := &ActionPlan{
|
||||
Id: apl.Id,
|
||||
AccountIDs: apl.AccountIDs.Clone(),
|
||||
}
|
||||
if apl.ActionTimings != nil {
|
||||
cln.ActionTimings = make([]*ActionTiming, len(apl.ActionTimings))
|
||||
for i, act := range apl.ActionTimings {
|
||||
cln.ActionTimings[i] = act.Clone()
|
||||
}
|
||||
}
|
||||
return cln, nil
|
||||
}
|
||||
|
||||
// Clone clones ActionTiming
|
||||
func (at *ActionTiming) Clone() (cln *ActionTiming) {
|
||||
if at == nil {
|
||||
return
|
||||
}
|
||||
cln = &ActionTiming{
|
||||
Uuid: at.Uuid,
|
||||
ActionsID: at.ActionsID,
|
||||
Weight: at.Weight,
|
||||
ExtraData: at.ExtraData,
|
||||
Timing: at.Timing.Clone(),
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// getDayOrEndOfMonth returns the day if is a valid date relative to t1 month
|
||||
func getDayOrEndOfMonth(day int, t1 time.Time) int {
|
||||
if lastDay := utils.GetEndOfMonth(t1).Day(); lastDay <= day { // clamp the day to last day of month in order to corectly compare the time
|
||||
day = lastDay
|
||||
}
|
||||
return day
|
||||
}
|
||||
|
||||
func (at *ActionTiming) GetNextStartTime(t1 time.Time) (t time.Time) {
|
||||
if !at.stCache.IsZero() {
|
||||
return at.stCache
|
||||
}
|
||||
i := at.Timing
|
||||
if i == nil || i.Timing == nil {
|
||||
return
|
||||
}
|
||||
// Normalize
|
||||
if i.Timing.StartTime == "" {
|
||||
i.Timing.StartTime = "00:00:00"
|
||||
}
|
||||
if len(i.Timing.Years) > 0 && len(i.Timing.Months) == 0 {
|
||||
i.Timing.Months = append(i.Timing.Months, 1)
|
||||
}
|
||||
if len(i.Timing.Months) > 0 && len(i.Timing.MonthDays) == 0 {
|
||||
i.Timing.MonthDays = append(i.Timing.MonthDays, 1)
|
||||
}
|
||||
at.stCache = cronexpr.MustParse(i.Timing.CronString()).Next(t1)
|
||||
if i.Timing.ID == utils.MetaMonthlyEstimated {
|
||||
// substract a month from at.stCache only if we skip 2 months
|
||||
// or we skip a month because mentioned MonthDay is after the last day of the current month
|
||||
if at.stCache.Month() == t1.Month()+2 ||
|
||||
(utils.GetEndOfMonth(t1).Day() < at.Timing.Timing.MonthDays[0] &&
|
||||
at.stCache.Month() == t1.Month()+1) {
|
||||
lastDay := utils.GetEndOfMonth(at.stCache).Day()
|
||||
// only change the time if the new one is after t1
|
||||
if tmp := at.stCache.AddDate(0, 0, -lastDay); tmp.After(t1) {
|
||||
at.stCache = tmp
|
||||
}
|
||||
}
|
||||
}
|
||||
return at.stCache
|
||||
}
|
||||
|
||||
func (at *ActionTiming) ResetStartTimeCache() {
|
||||
at.stCache = time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
}
|
||||
|
||||
func (at *ActionTiming) SetActions(as Actions) {
|
||||
at.actions = as
|
||||
}
|
||||
|
||||
func (at *ActionTiming) SetAccountIDs(accIDs utils.StringMap) {
|
||||
at.accountIDs = accIDs
|
||||
}
|
||||
|
||||
func (at *ActionTiming) RemoveAccountID(acntID string) (found bool) {
|
||||
if _, found = at.accountIDs[acntID]; found {
|
||||
delete(at.accountIDs, acntID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (at *ActionTiming) GetAccountIDs() utils.StringMap {
|
||||
return at.accountIDs
|
||||
}
|
||||
|
||||
func (at *ActionTiming) SetActionPlanID(id string) {
|
||||
at.actionPlanID = id
|
||||
}
|
||||
|
||||
func (at *ActionTiming) GetActionPlanID() string {
|
||||
return at.actionPlanID
|
||||
}
|
||||
|
||||
func (at *ActionTiming) getActions() (as []*Action, err error) {
|
||||
if at.actions == nil {
|
||||
at.actions, err = dm.GetActions(at.ActionsID, false, utils.NonTransactional)
|
||||
}
|
||||
at.actions.Sort()
|
||||
return at.actions, err
|
||||
}
|
||||
|
||||
// Execute will execute all actions in an action plan
|
||||
// Reports on success/fail via channel if != nil
|
||||
func (at *ActionTiming) Execute(successActions, failedActions chan *Action) (err error) {
|
||||
at.ResetStartTimeCache()
|
||||
aac, err := at.getActions()
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("Failed to get actions for %s: %s", at.ActionsID, err))
|
||||
return
|
||||
}
|
||||
var partialyExecuted bool
|
||||
for accID := range at.accountIDs {
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
acc, err := dm.GetAccount(accID)
|
||||
if err != nil { // create account
|
||||
if err != utils.ErrNotFound {
|
||||
utils.Logger.Warning(fmt.Sprintf("Could not get account id: %s. Skipping!", accID))
|
||||
return 0, err
|
||||
}
|
||||
err = nil
|
||||
acc = &Account{
|
||||
ID: accID,
|
||||
}
|
||||
}
|
||||
transactionFailed := false
|
||||
removeAccountActionFound := false
|
||||
for _, a := range aac {
|
||||
// check action filter
|
||||
if len(a.Filter) > 0 {
|
||||
matched, err := acc.matchActionFilter(a.Filter)
|
||||
//log.Print("Checkng: ", a.Filter, matched)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if !matched {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if a.Balance == nil {
|
||||
a.Balance = &BalanceFilter{}
|
||||
}
|
||||
if a.ExpirationString != "" { // if it's *unlimited then it has to be zero time
|
||||
if expDate, parseErr := utils.ParseTimeDetectLayout(a.ExpirationString,
|
||||
config.CgrConfig().GeneralCfg().DefaultTimezone); parseErr == nil {
|
||||
a.Balance.ExpirationDate = &time.Time{}
|
||||
*a.Balance.ExpirationDate = expDate
|
||||
}
|
||||
}
|
||||
|
||||
actionFunction, exists := getActionFunc(a.ActionType)
|
||||
if !exists {
|
||||
// do not allow the action plan to be rescheduled
|
||||
at.Timing = nil
|
||||
utils.Logger.Err(fmt.Sprintf("Function type %v not available, aborting execution!", a.ActionType))
|
||||
partialyExecuted = true
|
||||
transactionFailed = true
|
||||
if failedActions != nil {
|
||||
go func(a *Action) { failedActions <- a }(a)
|
||||
}
|
||||
break
|
||||
}
|
||||
if err := actionFunction(acc, a, aac, at.ExtraData); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("Error executing action %s: %v!", a.ActionType, err))
|
||||
partialyExecuted = true
|
||||
transactionFailed = true
|
||||
if failedActions != nil {
|
||||
go func(a *Action) { failedActions <- a }(a)
|
||||
}
|
||||
break
|
||||
}
|
||||
if successActions != nil {
|
||||
go func(a *Action) { successActions <- a }(a)
|
||||
}
|
||||
if a.ActionType == utils.MetaRemoveAccount {
|
||||
removeAccountActionFound = true
|
||||
}
|
||||
}
|
||||
if !transactionFailed && !removeAccountActionFound {
|
||||
dm.SetAccount(acc)
|
||||
}
|
||||
return 0, nil
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.AccountPrefix+accID)
|
||||
}
|
||||
//reset the error in case that the account is not found
|
||||
err = nil
|
||||
if len(at.accountIDs) == 0 { // action timing executing without accounts
|
||||
for _, a := range aac {
|
||||
if expDate, parseErr := utils.ParseTimeDetectLayout(a.ExpirationString,
|
||||
config.CgrConfig().GeneralCfg().DefaultTimezone); (a.Balance == nil || a.Balance.EmptyExpirationDate()) &&
|
||||
parseErr == nil && !expDate.IsZero() {
|
||||
a.Balance.ExpirationDate = &time.Time{}
|
||||
*a.Balance.ExpirationDate = expDate
|
||||
}
|
||||
|
||||
actionFunction, exists := getActionFunc(a.ActionType)
|
||||
if !exists {
|
||||
// do not allow the action plan to be rescheduled
|
||||
at.Timing = nil
|
||||
utils.Logger.Err(fmt.Sprintf("Function type %v not available, aborting execution!", a.ActionType))
|
||||
partialyExecuted = true
|
||||
if failedActions != nil {
|
||||
go func(a *Action) { failedActions <- a }(a)
|
||||
}
|
||||
break
|
||||
}
|
||||
if err := actionFunction(nil, a, aac, at.ExtraData); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("Error executing accountless action %s: %v!", a.ActionType, err))
|
||||
partialyExecuted = true
|
||||
if failedActions != nil {
|
||||
go func(a *Action) { failedActions <- a }(a)
|
||||
}
|
||||
break
|
||||
}
|
||||
if successActions != nil {
|
||||
go func(a *Action) { successActions <- a }(a)
|
||||
}
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("Error executing action plan: %v", err))
|
||||
return err
|
||||
}
|
||||
if partialyExecuted {
|
||||
return utils.ErrPartiallyExecuted
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (at *ActionTiming) IsASAP() bool {
|
||||
if at.Timing == nil {
|
||||
return false
|
||||
}
|
||||
return at.Timing.Timing.StartTime == utils.MetaASAP
|
||||
}
|
||||
|
||||
// Structure to store actions according to execution time and weight
|
||||
type ActionTimingPriorityList []*ActionTiming
|
||||
|
||||
func (atpl ActionTimingPriorityList) Len() int {
|
||||
return len(atpl)
|
||||
}
|
||||
|
||||
func (atpl ActionTimingPriorityList) Swap(i, j int) {
|
||||
atpl[i], atpl[j] = atpl[j], atpl[i]
|
||||
}
|
||||
|
||||
func (atpl ActionTimingPriorityList) Less(i, j int) bool {
|
||||
if atpl[i].GetNextStartTime(time.Now()).Equal(atpl[j].GetNextStartTime(time.Now())) {
|
||||
// higher weights earlyer in the list
|
||||
return atpl[i].Weight > atpl[j].Weight
|
||||
}
|
||||
return atpl[i].GetNextStartTime(time.Now()).Before(atpl[j].GetNextStartTime(time.Now()))
|
||||
}
|
||||
|
||||
func (atpl ActionTimingPriorityList) Sort() {
|
||||
sort.Sort(atpl)
|
||||
}
|
||||
|
||||
// Structure to store actions according to weight
|
||||
type ActionTimingWeightOnlyPriorityList []*ActionTiming
|
||||
|
||||
func (atpl ActionTimingWeightOnlyPriorityList) Len() int {
|
||||
return len(atpl)
|
||||
}
|
||||
|
||||
func (atpl ActionTimingWeightOnlyPriorityList) Swap(i, j int) {
|
||||
atpl[i], atpl[j] = atpl[j], atpl[i]
|
||||
}
|
||||
|
||||
func (atpl ActionTimingWeightOnlyPriorityList) Less(i, j int) bool {
|
||||
return atpl[i].Weight > atpl[j].Weight
|
||||
}
|
||||
|
||||
func (atpl ActionTimingWeightOnlyPriorityList) Sort() {
|
||||
sort.Sort(atpl)
|
||||
}
|
||||
@@ -46,7 +46,6 @@ type Balance struct {
|
||||
Factor ValueFactor
|
||||
Blocker bool
|
||||
precision int
|
||||
account *Account // used to store ub reference for shared balances
|
||||
dirty bool
|
||||
}
|
||||
|
||||
|
||||
@@ -1,310 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
type Scheduler struct {
|
||||
sync.RWMutex
|
||||
queue engine.ActionTimingPriorityList
|
||||
timer *time.Timer
|
||||
restartLoop chan struct{}
|
||||
dm *engine.DataManager
|
||||
cfg *config.CGRConfig
|
||||
fltrS *engine.FilterS
|
||||
schedulerStarted bool
|
||||
actStatsInterval time.Duration // How long time to keep the stats in memory
|
||||
actSucessChan, actFailedChan chan *engine.Action // ActionPlan will pass actions via these channels
|
||||
aSMux, aFMux sync.RWMutex // protect schedStats
|
||||
actSuccessStats, actFailedStats map[string]map[time.Time]bool // keep here stats regarding executed actions, map[actionType]map[execTime]bool
|
||||
}
|
||||
|
||||
func NewScheduler(dm *engine.DataManager, cfg *config.CGRConfig,
|
||||
fltrS *engine.FilterS) (s *Scheduler) {
|
||||
s = &Scheduler{
|
||||
restartLoop: make(chan struct{}),
|
||||
dm: dm,
|
||||
cfg: cfg,
|
||||
fltrS: fltrS,
|
||||
}
|
||||
s.Reload()
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Scheduler) updateActStats(act *engine.Action, isFailed bool) {
|
||||
mux := &s.aSMux
|
||||
statsMp := s.actSuccessStats
|
||||
if isFailed {
|
||||
mux = &s.aFMux
|
||||
statsMp = s.actFailedStats
|
||||
}
|
||||
now := time.Now()
|
||||
mux.Lock()
|
||||
for aType := range statsMp {
|
||||
for t := range statsMp[aType] {
|
||||
if now.Sub(t) > s.actStatsInterval {
|
||||
delete(statsMp[aType], t)
|
||||
if len(statsMp[aType]) == 0 {
|
||||
delete(statsMp, aType)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if act == nil {
|
||||
return
|
||||
}
|
||||
if _, hasIt := statsMp[act.ActionType]; !hasIt {
|
||||
statsMp[act.ActionType] = make(map[time.Time]bool)
|
||||
}
|
||||
statsMp[act.ActionType][now] = true
|
||||
mux.Unlock()
|
||||
}
|
||||
|
||||
func (s *Scheduler) Loop() {
|
||||
s.schedulerStarted = true
|
||||
for {
|
||||
if !s.schedulerStarted { // shutdown requested
|
||||
break
|
||||
}
|
||||
for len(s.queue) == 0 { //hang here if empty
|
||||
<-s.restartLoop
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Scheduler queue length: %v", len(s.queue)))
|
||||
s.Lock()
|
||||
a0 := s.queue[0]
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Action: %s", a0.ActionsID))
|
||||
now := time.Now()
|
||||
start := a0.GetNextStartTime(now)
|
||||
if start.Equal(now) || start.Before(now) {
|
||||
go a0.Execute(s.actSucessChan, s.actFailedChan)
|
||||
// if after execute the next start time is in the past then
|
||||
// do not add it to the queue
|
||||
a0.ResetStartTimeCache()
|
||||
now = time.Now().Add(time.Second)
|
||||
start = a0.GetNextStartTime(now)
|
||||
if start.Before(now) {
|
||||
s.queue = s.queue[1:]
|
||||
} else {
|
||||
s.queue = append(s.queue, a0)
|
||||
s.queue = s.queue[1:]
|
||||
sort.Sort(s.queue)
|
||||
}
|
||||
s.Unlock()
|
||||
} else {
|
||||
s.Unlock()
|
||||
d := a0.GetNextStartTime(now).Sub(now)
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time to next action (%s): %v", a0.ActionsID, d))
|
||||
s.timer = time.NewTimer(d)
|
||||
select {
|
||||
case <-s.timer.C:
|
||||
// timer has expired
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time for action on %s", a0.ActionsID))
|
||||
case <-s.restartLoop:
|
||||
// nothing to do, just continue the loop
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) Reload() {
|
||||
s.loadActionPlans()
|
||||
s.restart()
|
||||
}
|
||||
|
||||
// loadTasks loads the tasks
|
||||
// this will push the tasks that did not match
|
||||
// the filters before exiting the function
|
||||
func (s *Scheduler) loadTasks() {
|
||||
// limit the number of concurrent tasks
|
||||
limit := make(chan struct{}, 10)
|
||||
// if some task don't mach the filter save them in this slice
|
||||
// in oreder to push them back when finish executing them
|
||||
var unexecutedTasks []*engine.Task
|
||||
// execute existing tasks
|
||||
for {
|
||||
task, err := s.dm.DataDB().PopTask()
|
||||
if err != nil || task == nil {
|
||||
break
|
||||
}
|
||||
if pass, err := s.fltrS.Pass(s.cfg.GeneralCfg().DefaultTenant,
|
||||
s.cfg.SchedulerCfg().Filters, task); err != nil || !pass {
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: <%s> querying filters for path: <%+v>, not executing task <%s> on account <%s>",
|
||||
utils.SchedulerS, err.Error(), s.cfg.SchedulerCfg().Filters, task.ActionsID, task.AccountID))
|
||||
}
|
||||
// we do not push the task back as this may cause an infinite loop
|
||||
// push it when the function is done and we stoped the for
|
||||
// do not use defer here as the functions are exeucted
|
||||
// from the last one to the first
|
||||
unexecutedTasks = append(unexecutedTasks, task)
|
||||
continue
|
||||
}
|
||||
limit <- struct{}{}
|
||||
go func() {
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> executing task %s on account %s",
|
||||
utils.SchedulerS, task.ActionsID, task.AccountID))
|
||||
task.Execute()
|
||||
<-limit
|
||||
}()
|
||||
}
|
||||
for _, t := range unexecutedTasks {
|
||||
if err := s.dm.DataDB().PushTask(t); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> failed pushing task <%s> back to DataDB, err <%s>",
|
||||
utils.SchedulerS, t.ActionsID, err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) loadActionPlans() {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.loadTasks()
|
||||
|
||||
actionPlans, err := s.dm.GetAllActionPlans()
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
utils.Logger.Warning(fmt.Sprintf("<Scheduler> Cannot get action plans: %v", err))
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> processing %d action plans", len(actionPlans)))
|
||||
// recreate the queue
|
||||
s.queue = engine.ActionTimingPriorityList{}
|
||||
for _, actionPlan := range actionPlans {
|
||||
if actionPlan == nil {
|
||||
continue
|
||||
}
|
||||
for _, at := range actionPlan.ActionTimings {
|
||||
if at.Timing == nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<Scheduler> Nil timing on action plan: %+v, discarding!", at))
|
||||
continue
|
||||
}
|
||||
if at.IsASAP() {
|
||||
continue // should be already executed as task
|
||||
}
|
||||
now := time.Now()
|
||||
if at.GetNextStartTime(now).Before(now) {
|
||||
// the task is obsolete, do not add it to the queue
|
||||
continue
|
||||
}
|
||||
at.SetAccountIDs(actionPlan.AccountIDs) // copy the accounts
|
||||
at.SetActionPlanID(actionPlan.Id)
|
||||
for _, task := range at.Tasks() {
|
||||
if pass, err := s.fltrS.Pass(s.cfg.GeneralCfg().DefaultTenant,
|
||||
s.cfg.SchedulerCfg().Filters, task); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: <%s> querying filters for path: <%+v>, not executing action <%s> on account <%s>",
|
||||
utils.SchedulerS, err.Error(), s.cfg.SchedulerCfg().Filters, task.ActionsID, task.AccountID))
|
||||
at.RemoveAccountID(task.AccountID)
|
||||
} else if !pass {
|
||||
at.RemoveAccountID(task.AccountID)
|
||||
}
|
||||
}
|
||||
s.queue = append(s.queue, at)
|
||||
}
|
||||
}
|
||||
sort.Sort(s.queue)
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> queued %d action plans", len(s.queue)))
|
||||
}
|
||||
|
||||
func (s *Scheduler) restart() {
|
||||
if s.schedulerStarted {
|
||||
s.restartLoop <- struct{}{}
|
||||
}
|
||||
if s.timer != nil {
|
||||
s.timer.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
type ArgsGetScheduledActions struct {
|
||||
Tenant, Account *string
|
||||
TimeStart, TimeEnd *time.Time // Filter based on next runTime
|
||||
utils.Paginator
|
||||
}
|
||||
|
||||
type ScheduledAction struct {
|
||||
NextRunTime time.Time
|
||||
Accounts int // Number of acccounts this action will run on
|
||||
ActionPlanID, ActionTimingUUID, ActionsID string
|
||||
}
|
||||
|
||||
func (s *Scheduler) GetScheduledActions(fltr ArgsGetScheduledActions) (schedActions []*ScheduledAction) {
|
||||
s.RLock()
|
||||
for _, at := range s.queue {
|
||||
sas := &ScheduledAction{NextRunTime: at.GetNextStartTime(time.Now()), Accounts: len(at.GetAccountIDs()),
|
||||
ActionPlanID: at.GetActionPlanID(), ActionTimingUUID: at.Uuid, ActionsID: at.ActionsID}
|
||||
if fltr.TimeStart != nil && !fltr.TimeStart.IsZero() && sas.NextRunTime.Before(*fltr.TimeStart) {
|
||||
continue // need to match the filter interval
|
||||
}
|
||||
if fltr.TimeEnd != nil && !fltr.TimeEnd.IsZero() && (sas.NextRunTime.After(*fltr.TimeEnd) || sas.NextRunTime.Equal(*fltr.TimeEnd)) {
|
||||
continue
|
||||
}
|
||||
// filter on account
|
||||
if fltr.Tenant != nil || fltr.Account != nil {
|
||||
found := false
|
||||
for accID := range at.GetAccountIDs() {
|
||||
split := strings.Split(accID, utils.ConcatenatedKeySep)
|
||||
if len(split) != 2 {
|
||||
continue // malformed account id
|
||||
}
|
||||
if fltr.Tenant != nil && *fltr.Tenant != split[0] {
|
||||
continue
|
||||
}
|
||||
if fltr.Account != nil && *fltr.Account != split[1] {
|
||||
continue
|
||||
}
|
||||
found = true
|
||||
break
|
||||
}
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
}
|
||||
schedActions = append(schedActions, sas)
|
||||
}
|
||||
if fltr.Paginator.Offset != nil {
|
||||
if *fltr.Paginator.Offset <= len(schedActions) {
|
||||
schedActions = schedActions[*fltr.Paginator.Offset:]
|
||||
}
|
||||
}
|
||||
if fltr.Paginator.Limit != nil {
|
||||
if *fltr.Paginator.Limit <= len(schedActions) {
|
||||
schedActions = schedActions[:*fltr.Paginator.Limit]
|
||||
}
|
||||
}
|
||||
s.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Scheduler) Shutdown() {
|
||||
s.schedulerStarted = false // disable loop on next run
|
||||
s.restartLoop <- struct{}{} // cancel waiting tasks
|
||||
if s.timer != nil {
|
||||
s.timer.Stop()
|
||||
}
|
||||
}
|
||||
@@ -1,48 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestSchedulerUpdateActStats(t *testing.T) {
|
||||
sched := &Scheduler{actStatsInterval: time.Millisecond, actSuccessStats: make(map[string]map[time.Time]bool)}
|
||||
sched.updateActStats(&engine.Action{Id: "REMOVE_1", ActionType: utils.MetaRemoveAccount}, false)
|
||||
if len(sched.actSuccessStats[utils.MetaRemoveAccount]) != 1 {
|
||||
t.Errorf("Wrong stats: %+v", sched.actSuccessStats[utils.MetaRemoveAccount])
|
||||
}
|
||||
sched.updateActStats(&engine.Action{Id: "REMOVE_2", ActionType: utils.MetaRemoveAccount}, false)
|
||||
if len(sched.actSuccessStats[utils.MetaRemoveAccount]) != 2 {
|
||||
t.Errorf("Wrong stats: %+v", sched.actSuccessStats[utils.MetaRemoveAccount])
|
||||
}
|
||||
sched.updateActStats(&engine.Action{Id: "LOG1", ActionType: utils.MetaLog}, false)
|
||||
if len(sched.actSuccessStats[utils.MetaLog]) != 1 ||
|
||||
len(sched.actSuccessStats[utils.MetaRemoveAccount]) != 2 {
|
||||
t.Errorf("Wrong stats: %+v", sched.actSuccessStats)
|
||||
}
|
||||
time.Sleep(sched.actStatsInterval)
|
||||
sched.updateActStats(&engine.Action{Id: "REMOVE_3", ActionType: utils.MetaRemoveAccount}, false)
|
||||
if len(sched.actSuccessStats[utils.MetaRemoveAccount]) != 1 || len(sched.actSuccessStats) != 1 {
|
||||
t.Errorf("Wrong stats: %+v", sched.actSuccessStats)
|
||||
}
|
||||
}
|
||||
@@ -34,7 +34,7 @@ import (
|
||||
func NewAPIerSv1Service(cfg *config.CGRConfig, dm *DataDBService,
|
||||
storDB *StorDBService, filterSChan chan *engine.FilterS,
|
||||
server *cores.Server,
|
||||
schedService *SchedulerService,
|
||||
actService *ActionService,
|
||||
responderService *ResponderService,
|
||||
internalAPIerSv1Chan chan rpcclient.ClientConnector,
|
||||
connMgr *engine.ConnManager, anz *AnalyzerService,
|
||||
@@ -46,7 +46,7 @@ func NewAPIerSv1Service(cfg *config.CGRConfig, dm *DataDBService,
|
||||
storDB: storDB,
|
||||
filterSChan: filterSChan,
|
||||
server: server,
|
||||
schedService: schedService,
|
||||
actService: actService,
|
||||
responderService: responderService,
|
||||
connMgr: connMgr,
|
||||
APIerSv1Chan: make(chan *v1.APIerSv1, 1),
|
||||
@@ -63,7 +63,7 @@ type APIerSv1Service struct {
|
||||
storDB *StorDBService
|
||||
filterSChan chan *engine.FilterS
|
||||
server *cores.Server
|
||||
schedService *SchedulerService
|
||||
actService *ActionService
|
||||
responderService *ResponderService
|
||||
connMgr *engine.ConnManager
|
||||
|
||||
@@ -101,14 +101,14 @@ func (apiService *APIerSv1Service) Start() (err error) {
|
||||
defer apiService.Unlock()
|
||||
|
||||
apiService.api = &v1.APIerSv1{
|
||||
DataManager: datadb,
|
||||
CdrDb: stordb,
|
||||
StorDb: stordb,
|
||||
Config: apiService.cfg,
|
||||
SchedulerService: apiService.schedService,
|
||||
FilterS: filterS,
|
||||
ConnMgr: apiService.connMgr,
|
||||
StorDBChan: storDBChan,
|
||||
DataManager: datadb,
|
||||
CdrDb: stordb,
|
||||
StorDb: stordb,
|
||||
Config: apiService.cfg,
|
||||
ActionService: apiService.actService,
|
||||
FilterS: filterS,
|
||||
ConnMgr: apiService.connMgr,
|
||||
StorDBChan: storDBChan,
|
||||
|
||||
Responder: apiService.responderService.GetResponder(), // if already started use it
|
||||
ResponderChan: respChan, // if not wait in listenAndServe
|
||||
|
||||
@@ -1,139 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package services
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// NewSchedulerService returns the Scheduler Service
|
||||
func NewSchedulerService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS *engine.CacheS, fltrSChan chan *engine.FilterS,
|
||||
server *cores.Server, internalSchedulerrSChan chan rpcclient.ClientConnector,
|
||||
connMgr *engine.ConnManager, anz *AnalyzerService,
|
||||
srvDep map[string]*sync.WaitGroup) *SchedulerService {
|
||||
return &SchedulerService{
|
||||
connChan: internalSchedulerrSChan,
|
||||
cfg: cfg,
|
||||
dm: dm,
|
||||
cacheS: cacheS,
|
||||
fltrSChan: fltrSChan,
|
||||
server: server,
|
||||
connMgr: connMgr,
|
||||
anz: anz,
|
||||
srvDep: srvDep,
|
||||
}
|
||||
}
|
||||
|
||||
// SchedulerService implements Service interface
|
||||
type SchedulerService struct {
|
||||
sync.RWMutex
|
||||
cfg *config.CGRConfig
|
||||
dm *DataDBService
|
||||
cacheS *engine.CacheS
|
||||
fltrSChan chan *engine.FilterS
|
||||
server *cores.Server
|
||||
|
||||
schS *scheduler.Scheduler
|
||||
rpc *v1.SchedulerSv1
|
||||
connChan chan rpcclient.ClientConnector
|
||||
connMgr *engine.ConnManager
|
||||
anz *AnalyzerService
|
||||
srvDep map[string]*sync.WaitGroup
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (schS *SchedulerService) Start() (err error) {
|
||||
if schS.IsRunning() {
|
||||
return utils.ErrServiceAlreadyRunning
|
||||
}
|
||||
|
||||
<-schS.cacheS.GetPrecacheChannel(utils.CacheActionPlans) // wait for ActionPlans to be cached
|
||||
|
||||
fltrS := <-schS.fltrSChan
|
||||
schS.fltrSChan <- fltrS
|
||||
dbchan := schS.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
schS.Lock()
|
||||
defer schS.Unlock()
|
||||
utils.Logger.Info("<ServiceManager> Starting CGRateS Scheduler.")
|
||||
schS.schS = scheduler.NewScheduler(datadb, schS.cfg, fltrS)
|
||||
go schS.schS.Loop()
|
||||
|
||||
schS.rpc = v1.NewSchedulerSv1(schS.cfg, datadb)
|
||||
if !schS.cfg.DispatcherSCfg().Enabled {
|
||||
schS.server.RpcRegister(schS.rpc)
|
||||
}
|
||||
schS.connChan <- schS.anz.GetInternalCodec(schS.rpc, utils.SchedulerS)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (schS *SchedulerService) Reload() (err error) {
|
||||
schS.Lock()
|
||||
schS.schS.Reload()
|
||||
schS.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (schS *SchedulerService) Shutdown() (err error) {
|
||||
schS.Lock()
|
||||
schS.schS.Shutdown()
|
||||
schS.schS = nil
|
||||
schS.rpc = nil
|
||||
<-schS.connChan
|
||||
schS.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (schS *SchedulerService) IsRunning() bool {
|
||||
schS.RLock()
|
||||
defer schS.RUnlock()
|
||||
return schS != nil && schS.schS != nil
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (schS *SchedulerService) ServiceName() string {
|
||||
return utils.SchedulerS
|
||||
}
|
||||
|
||||
// GetScheduler returns the Scheduler
|
||||
func (schS *SchedulerService) GetScheduler() *scheduler.Scheduler {
|
||||
schS.RLock()
|
||||
defer schS.RUnlock()
|
||||
return schS.schS
|
||||
}
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (schS *SchedulerService) ShouldRun() bool {
|
||||
return schS.cfg.SchedulerCfg().Enabled
|
||||
}
|
||||
@@ -1,102 +0,0 @@
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package services
|
||||
|
||||
import (
|
||||
"path"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
func TestSchedulerSReload(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
|
||||
utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID)
|
||||
utils.Logger.SetLogLevel(7)
|
||||
shdChan := utils.NewSyncedChan()
|
||||
shdWg := new(sync.WaitGroup)
|
||||
chS := engine.NewCacheS(cfg, nil, nil)
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
close(chS.GetPrecacheChannel(utils.CacheActionPlans))
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, nil)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
|
||||
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(schS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if schS.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
if db.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
var reply string
|
||||
if err := cfg.V1ReloadConfig(&config.ReloadArgs{
|
||||
Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongonew"),
|
||||
Section: config.SCHEDULER_JSN,
|
||||
}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Expecting OK ,received %s", reply)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond) //need to switch to gorutine
|
||||
if !schS.IsRunning() {
|
||||
t.Errorf("Expected service to be running")
|
||||
}
|
||||
if !db.IsRunning() {
|
||||
t.Errorf("Expected service to be running")
|
||||
}
|
||||
err := schS.Start()
|
||||
if err == nil || err != utils.ErrServiceAlreadyRunning {
|
||||
t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err)
|
||||
}
|
||||
err = schS.Reload()
|
||||
if err != nil {
|
||||
t.Errorf("\nExpecting <nil>,\n Received <%+v>", err)
|
||||
}
|
||||
getScheduler := schS.GetScheduler()
|
||||
if !reflect.DeepEqual(schS.schS, getScheduler) {
|
||||
t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.ToJSON(schS.schS), utils.ToJSON(getScheduler))
|
||||
}
|
||||
cfg.SchedulerCfg().Enabled = false
|
||||
cfg.GetReloadChan(config.SCHEDULER_JSN) <- struct{}{}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
if schS.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
shdChan.CloseOnce()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
@@ -1,66 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package services
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
//TestSchedulerSCoverage for cover testing
|
||||
func TestSchedulerSCoverage(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
shdChan := utils.NewSyncedChan()
|
||||
chS := engine.NewCacheS(cfg, nil, nil)
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
close(chS.GetPrecacheChannel(utils.CacheActionPlans))
|
||||
server := cores.NewServer(nil)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
|
||||
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep)
|
||||
|
||||
if schS.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
schS.schS = &scheduler.Scheduler{}
|
||||
if !schS.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
serviceName := schS.ServiceName()
|
||||
if !reflect.DeepEqual(serviceName, utils.SchedulerS) {
|
||||
t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.SchedulerS, serviceName)
|
||||
}
|
||||
shouldRun := schS.ShouldRun()
|
||||
if !reflect.DeepEqual(shouldRun, false) {
|
||||
t.Errorf("\nExpecting <false>,\n Received <%+v>", shouldRun)
|
||||
}
|
||||
if !reflect.DeepEqual(schS.GetScheduler(), schS.schS) {
|
||||
t.Errorf("\nExpecting <%+v>,\n Received <%+v>", schS.schS, schS.GetScheduler())
|
||||
}
|
||||
}
|
||||
@@ -92,9 +92,9 @@ func (srvMngr *ServiceManager) V1StartService(args ArgStartService, reply *strin
|
||||
case utils.MetaScheduler:
|
||||
// stop the service using the config
|
||||
srvMngr.Lock()
|
||||
srvMngr.cfg.SchedulerCfg().Enabled = true
|
||||
srvMngr.cfg.ActionSCfg().Enabled = true
|
||||
srvMngr.Unlock()
|
||||
srvMngr.cfg.GetReloadChan(config.SCHEDULER_JSN) <- struct{}{}
|
||||
srvMngr.cfg.GetReloadChan(config.ActionSJson) <- struct{}{}
|
||||
default:
|
||||
err = errors.New(utils.UnsupportedServiceIDCaps)
|
||||
}
|
||||
@@ -111,9 +111,9 @@ func (srvMngr *ServiceManager) V1StopService(args ArgStartService, reply *string
|
||||
case utils.MetaScheduler:
|
||||
// stop the service using the config
|
||||
srvMngr.Lock()
|
||||
srvMngr.cfg.SchedulerCfg().Enabled = false
|
||||
srvMngr.cfg.ActionSCfg().Enabled = false
|
||||
srvMngr.Unlock()
|
||||
srvMngr.cfg.GetReloadChan(config.SCHEDULER_JSN) <- struct{}{}
|
||||
srvMngr.cfg.GetReloadChan(config.ActionSJson) <- struct{}{}
|
||||
default:
|
||||
err = errors.New(utils.UnsupportedServiceIDCaps)
|
||||
}
|
||||
@@ -198,8 +198,6 @@ func (srvMngr *ServiceManager) handleReload() {
|
||||
go srvMngr.reloadService(utils.ResourceS)
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.RouteSJson):
|
||||
go srvMngr.reloadService(utils.RouteS)
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.SCHEDULER_JSN):
|
||||
go srvMngr.reloadService(utils.SchedulerS)
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.RALS_JSN):
|
||||
go srvMngr.reloadService(utils.RALService)
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.ApierS):
|
||||
|
||||
Reference in New Issue
Block a user