Added reset threshold API. Fixes #1573

This commit is contained in:
Trial97
2020-11-09 09:33:54 +02:00
committed by Dan Christian Bogos
parent a33806f1d3
commit 3849dd325c
37 changed files with 242 additions and 86 deletions

View File

@@ -25,12 +25,12 @@ import (
"github.com/cgrates/cgrates/utils"
)
// NewThresholdSV1 initializes ThresholdSV1
// NewThresholdSv1 initializes ThresholdSV1
func NewThresholdSv1(tS *engine.ThresholdService) *ThresholdSv1 {
return &ThresholdSv1{tS: tS}
}
// Exports RPC from RLs
// ThresholdSv1 exports RPC from RLs
type ThresholdSv1 struct {
tS *engine.ThresholdService
}
@@ -60,6 +60,11 @@ func (tSv1 *ThresholdSv1) ProcessEvent(args *engine.ThresholdsArgsProcessEvent,
return tSv1.tS.V1ProcessEvent(args, tIDs)
}
// ResetThreshold resets the threshold hits
func (tSv1 *ThresholdSv1) ResetThreshold(tntID *utils.TenantIDWithOpts, reply *string) error {
return tSv1.tS.V1ResetThreshold(tntID.TenantID, reply)
}
// GetThresholdProfile returns a Threshold Profile
func (apierSv1 *APIerSv1) GetThresholdProfile(arg *utils.TenantID, reply *engine.ThresholdProfile) (err error) {
if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
@@ -69,11 +74,11 @@ func (apierSv1 *APIerSv1) GetThresholdProfile(arg *utils.TenantID, reply *engine
if tnt == utils.EmptyString {
tnt = apierSv1.Config.GeneralCfg().DefaultTenant
}
if th, err := apierSv1.DataManager.GetThresholdProfile(tnt, arg.ID, true, true, utils.NonTransactional); err != nil {
th, err := apierSv1.DataManager.GetThresholdProfile(tnt, arg.ID, true, true, utils.NonTransactional)
if err != nil {
return utils.APIErrorHandler(err)
} else {
*reply = *th
}
*reply = *th
return
}
@@ -153,7 +158,7 @@ func (apierSv1 *APIerSv1) SetThresholdProfile(args *engine.ThresholdWithCache, r
return nil
}
// Remove a specific Threshold Profile
// RemoveThresholdProfile removes a specific Threshold Profile
func (apierSv1 *APIerSv1) RemoveThresholdProfile(args *utils.TenantIDWithCache, reply *string) error {
if missing := utils.MissingStructFields(args, []string{utils.ID}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
@@ -188,6 +193,7 @@ func (apierSv1 *APIerSv1) RemoveThresholdProfile(args *utils.TenantIDWithCache,
return nil
}
// Ping .
func (tSv1 *ThresholdSv1) Ping(ign *utils.CGREventWithOpts, reply *string) error {
*reply = utils.Pong
return nil

View File

@@ -245,6 +245,7 @@ var (
testV1TSProcessEventWithoutTenant,
testV1TSGetThresholdsWithoutTenant,
testV1TSProcessAccountUpdateEvent,
testV1TSResetThresholdsWithoutTenant,
testV1TSStopEngine,
}
)
@@ -944,3 +945,39 @@ func testV1TSProcessAccountUpdateEvent(t *testing.T) {
}
}
func testV1TSResetThresholdsWithoutTenant(t *testing.T) {
expectedThreshold := &engine.Threshold{
Tenant: "cgrates.org",
ID: "THD_ACNT_BALANCE_1",
Hits: 1,
}
var reply *engine.Threshold
if err := tSv1Rpc.Call(utils.ThresholdSv1GetThreshold,
&utils.TenantIDWithOpts{TenantID: &utils.TenantID{ID: "THD_ACNT_BALANCE_1"}},
&reply); err != nil {
t.Fatal(err)
}
reply.Snooze = expectedThreshold.Snooze
if !reflect.DeepEqual(expectedThreshold, reply) {
t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(expectedThreshold), utils.ToJSON(reply))
}
var result string
if err := tSv1Rpc.Call(utils.ThresholdSv1ResetThreshold,
&utils.TenantIDWithOpts{TenantID: &utils.TenantID{ID: "THD_ACNT_BALANCE_1"}},
&result); err != nil {
t.Fatal(err)
} else if result != utils.OK {
t.Errorf("Expected %+v \n, received %+v", utils.OK, result)
}
expectedThreshold.Hits = 0
if err := tSv1Rpc.Call(utils.ThresholdSv1GetThreshold,
&utils.TenantIDWithOpts{TenantID: &utils.TenantID{ID: "THD_ACNT_BALANCE_1"}},
&reply); err != nil {
t.Fatal(err)
}
reply.Snooze = expectedThreshold.Snooze
if !reflect.DeepEqual(expectedThreshold, reply) {
t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(expectedThreshold), utils.ToJSON(reply))
}
}

View File

@@ -64,6 +64,7 @@ var (
testV2CDRsSetThreshold,
testV2CDRsProcessCDRWithThreshold,
testV2CDRsGetThreshold,
testV2CDRsResetThresholdAction,
testv2CDRsGetCDRsDest,
@@ -1167,3 +1168,26 @@ func testV2CDRsKillEngine(t *testing.T) {
t.Error(err)
}
}
func testV2CDRsResetThresholdAction(t *testing.T) {
var reply string
if err := cdrsRpc.Call(utils.APIerSv2SetActions, &utils.AttrSetActions{
ActionsId: "ACT_RESET_THD",
Actions: []*utils.TPAction{{Identifier: utils.MetaResetThreshold, ExtraParameters: "cgrates.org:THD_Test"}},
}, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Calling APIerSv2.SetActions received: %s", reply)
}
attrs := utils.AttrExecuteAction{Tenant: "cgrates.org", ActionsId: "ACT_RESET_THD"}
if err := cdrsRpc.Call(utils.APIerSv1ExecuteAction, attrs, &reply); err != nil {
t.Error(err)
}
var td engine.Threshold
if err := cdrsRpc.Call(utils.ThresholdSv1GetThreshold,
&utils.TenantIDWithOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "THD_Test"}}, &td); err != nil {
t.Error(err)
} else if td.Hits != 0 {
t.Errorf("Expecting threshold to be reset received: %v", td.Hits)
}
}

View File

@@ -95,7 +95,7 @@ func (aCfg *ApierCfg) AsMapInterface() (initialMap map[string]interface{}) {
cachesConns := make([]string, len(aCfg.CachesConns))
for i, item := range aCfg.CachesConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches) {
cachesConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaCaches, utils.EmptyString)
cachesConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaCaches)
} else {
cachesConns[i] = item
}
@@ -106,7 +106,7 @@ func (aCfg *ApierCfg) AsMapInterface() (initialMap map[string]interface{}) {
schedulerConns := make([]string, len(aCfg.SchedulerConns))
for i, item := range aCfg.SchedulerConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaScheduler) {
schedulerConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaScheduler, utils.EmptyString)
schedulerConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaScheduler)
} else {
schedulerConns[i] = item
}
@@ -117,7 +117,7 @@ func (aCfg *ApierCfg) AsMapInterface() (initialMap map[string]interface{}) {
attributeSConns := make([]string, len(aCfg.AttributeSConns))
for i, item := range aCfg.AttributeSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes) {
attributeSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaAttributes, utils.EmptyString)
attributeSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaAttributes)
} else {
attributeSConns[i] = item
}
@@ -128,7 +128,7 @@ func (aCfg *ApierCfg) AsMapInterface() (initialMap map[string]interface{}) {
eesConns := make([]string, len(aCfg.EEsConns))
for i, item := range aCfg.EEsConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs) {
eesConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaEEs, utils.EmptyString)
eesConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaEEs)
} else {
eesConns[i] = item
}

View File

@@ -167,7 +167,7 @@ func (cdrscfg *CdrsCfg) AsMapInterface() (initialMP map[string]interface{}) {
chargerSConns := make([]string, len(cdrscfg.ChargerSConns))
for i, item := range cdrscfg.ChargerSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers) {
chargerSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaChargers, utils.EmptyString)
chargerSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaChargers)
} else {
chargerSConns[i] = item
}
@@ -178,7 +178,7 @@ func (cdrscfg *CdrsCfg) AsMapInterface() (initialMP map[string]interface{}) {
raterConns := make([]string, len(cdrscfg.RaterConns))
for i, item := range cdrscfg.RaterConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResponder) {
raterConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaResponder, utils.EmptyString)
raterConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaResponder)
} else {
raterConns[i] = item
}
@@ -189,7 +189,7 @@ func (cdrscfg *CdrsCfg) AsMapInterface() (initialMP map[string]interface{}) {
attributeSConns := make([]string, len(cdrscfg.AttributeSConns))
for i, item := range cdrscfg.AttributeSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes) {
attributeSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaAttributes, utils.EmptyString)
attributeSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaAttributes)
} else {
attributeSConns[i] = item
}
@@ -200,7 +200,7 @@ func (cdrscfg *CdrsCfg) AsMapInterface() (initialMP map[string]interface{}) {
thresholdSConns := make([]string, len(cdrscfg.ThresholdSConns))
for i, item := range cdrscfg.ThresholdSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds) {
thresholdSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaThresholds, utils.EmptyString)
thresholdSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaThresholds)
} else {
thresholdSConns[i] = item
}
@@ -211,7 +211,7 @@ func (cdrscfg *CdrsCfg) AsMapInterface() (initialMP map[string]interface{}) {
statSConns := make([]string, len(cdrscfg.StatSConns))
for i, item := range cdrscfg.StatSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStatS) {
statSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaStatS, utils.EmptyString)
statSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaStatS)
} else {
statSConns[i] = item
}
@@ -222,7 +222,7 @@ func (cdrscfg *CdrsCfg) AsMapInterface() (initialMP map[string]interface{}) {
schedulerConns := make([]string, len(cdrscfg.SchedulerConns))
for i, item := range cdrscfg.SchedulerConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaScheduler) {
schedulerConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaScheduler, utils.EmptyString)
schedulerConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaScheduler)
} else {
schedulerConns[i] = item
}
@@ -233,7 +233,7 @@ func (cdrscfg *CdrsCfg) AsMapInterface() (initialMP map[string]interface{}) {
eesConns := make([]string, len(cdrscfg.EEsConns))
for i, item := range cdrscfg.EEsConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs) {
eesConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaEEs, utils.EmptyString)
eesConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaEEs)
} else {
eesConns[i] = item
}

View File

@@ -93,7 +93,7 @@ func (cS *ChargerSCfg) AsMapInterface() (initialMP map[string]interface{}) {
attributeSConns := make([]string, len(cS.AttributeSConns))
for i, item := range cS.AttributeSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes) {
attributeSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaAttributes, utils.EmptyString)
attributeSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaAttributes)
} else {
attributeSConns[i] = item
}

View File

@@ -512,7 +512,7 @@ func (cfg *CGRConfig) loadSchedulerCfg(jsnCfg *CgrJsonCfg) (err error) {
if jsnSchedCfg, err = jsnCfg.SchedulerJsonCfg(); err != nil {
return
}
return cfg.schedulerCfg.loadFromJsonCfg(jsnSchedCfg)
return cfg.schedulerCfg.loadFromJSONCfg(jsnSchedCfg)
}
// loadCdrsCfg loads the Cdrs section of the configuration

View File

@@ -219,6 +219,7 @@ const CGRATES_CFG_JSON = `
"schedulers": {
"enabled": false, // start Scheduler service: <true|false>
"cdrs_conns": [], // connections to CDRs for *cdrlog actions <""|*internal|$rpc_conns_id>
"thresholds_conns": [], // connections to ThresholdS for *reset_threshold action <""|*internal|$rpc_conns_id>
"filters": [], // only execute actions matching these filters
},

View File

@@ -302,9 +302,10 @@ func testCGRConfigReloadSchedulerS(t *testing.T) {
t.Errorf("Expected OK received: %s", reply)
}
expAttr := &SchedulerCfg{
Enabled: true,
CDRsConns: []string{utils.MetaLocalHost},
Filters: []string{},
Enabled: true,
CDRsConns: []string{utils.MetaLocalHost},
ThreshSConns: []string{},
Filters: []string{},
}
if !reflect.DeepEqual(expAttr, cfg.SchedulerCfg()) {
t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.SchedulerCfg()))

View File

@@ -624,9 +624,10 @@ func TestDfRalsJsonCfg(t *testing.T) {
func TestDfSchedulerJsonCfg(t *testing.T) {
eCfg := &SchedulerJsonCfg{
Enabled: utils.BoolPointer(false),
Cdrs_conns: &[]string{},
Filters: &[]string{},
Enabled: utils.BoolPointer(false),
Cdrs_conns: &[]string{},
Thresholds_conns: &[]string{},
Filters: &[]string{},
}
if cfg, err := dfCgrJSONCfg.SchedulerJsonCfg(); err != nil {
t.Error(err)

View File

@@ -405,9 +405,10 @@ func TestCgrCfgJSONDefaultsRALs(t *testing.T) {
func TestCgrCfgJSONDefaultsScheduler(t *testing.T) {
eSchedulerCfg := &SchedulerCfg{
Enabled: false,
CDRsConns: []string{},
Filters: []string{},
Enabled: false,
CDRsConns: []string{},
ThreshSConns: []string{},
Filters: []string{},
}
if !reflect.DeepEqual(cgrCfg.schedulerCfg, eSchedulerCfg) {
t.Errorf("received: %+v, expecting: %+v", cgrCfg.schedulerCfg, eSchedulerCfg)
@@ -2257,9 +2258,10 @@ func TestDispatcherSConfig(t *testing.T) {
func TestSchedulerConfig(t *testing.T) {
expected := &SchedulerCfg{
Enabled: false,
CDRsConns: []string{},
Filters: []string{},
Enabled: false,
CDRsConns: []string{},
ThreshSConns: []string{},
Filters: []string{},
}
cgrConfig, err := NewDefaultCGRConfig()
if err != nil {
@@ -3878,9 +3880,10 @@ func TestV1GetConfigScheduler(t *testing.T) {
var reply map[string]interface{}
expected := map[string]interface{}{
SCHEDULER_JSN: map[string]interface{}{
utils.EnabledCfg: false,
utils.CDRsConnsCfg: []string{},
utils.FiltersCfg: []string{},
utils.EnabledCfg: false,
utils.CDRsConnsCfg: []string{},
utils.ThreshSConnsCfg: []string{},
utils.FiltersCfg: []string{},
},
}
if cfgCgr, err := NewDefaultCGRConfig(); err != nil {

View File

@@ -507,6 +507,14 @@ func (cfg *CGRConfig) checkConfigSanity() error {
return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.SchedulerS, connID)
}
}
for _, connID := range cfg.schedulerCfg.ThreshSConns {
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.thresholdSCfg.Enabled {
return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.ThresholdS, utils.SchedulerS)
}
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.SchedulerS, connID)
}
}
}
// EventReader sanity checks
if cfg.ersCfg.Enabled {

View File

@@ -146,7 +146,7 @@ func (ds *DiameterAgentCfg) AsMapInterface(separator string) (initialMP map[stri
for i, item := range ds.SessionSConns {
buf := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS)
if item == buf {
sessionSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaSessionS, utils.EmptyString)
sessionSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaSessionS)
} else {
sessionSConns[i] = item
}

View File

@@ -114,7 +114,7 @@ func (dps *DispatcherSCfg) AsMapInterface() (initialMP map[string]interface{}) {
attributeSConns := make([]string, len(dps.AttributeSConns))
for i, item := range dps.AttributeSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes) {
attributeSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaAttributes, utils.EmptyString)
attributeSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaAttributes)
} else {
attributeSConns[i] = item
}

View File

@@ -100,7 +100,7 @@ func (da *DNSAgentCfg) AsMapInterface(separator string) (initialMP map[string]in
sessionSConns := make([]string, len(da.SessionSConns))
for i, item := range da.SessionSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS) {
sessionSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaSessionS, utils.EmptyString)
sessionSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaSessionS)
} else {
sessionSConns[i] = item
}

View File

@@ -76,7 +76,7 @@ func (fSCfg *FilterSCfg) AsMapInterface() (initialMP map[string]interface{}) {
statSConns := make([]string, len(fSCfg.StatSConns))
for i, item := range fSCfg.StatSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStatS) {
statSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaStatS, utils.EmptyString)
statSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaStatS)
} else {
statSConns[i] = item
}
@@ -87,7 +87,7 @@ func (fSCfg *FilterSCfg) AsMapInterface() (initialMP map[string]interface{}) {
resourceSConns := make([]string, len(fSCfg.ResourceSConns))
for i, item := range fSCfg.ResourceSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources) {
resourceSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaResources, utils.EmptyString)
resourceSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaResources)
} else {
resourceSConns[i] = item
}
@@ -98,7 +98,7 @@ func (fSCfg *FilterSCfg) AsMapInterface() (initialMP map[string]interface{}) {
apierConns := make([]string, len(fSCfg.ApierSConns))
for i, item := range fSCfg.ApierSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaApier) {
apierConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaApier, utils.EmptyString)
apierConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaApier)
} else {
apierConns[i] = item
}

View File

@@ -116,7 +116,7 @@ func (ka *KamAgentCfg) AsMapInterface() (initialMP map[string]interface{}) {
sessionSConns := make([]string, len(ka.SessionSConns))
for i, item := range ka.SessionSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS) {
sessionSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaSessionS, utils.EmptyString)
sessionSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaSessionS)
} else {
sessionSConns[i] = item
}

View File

@@ -130,9 +130,10 @@ type RalsJsonCfg struct {
// Scheduler config section
type SchedulerJsonCfg struct {
Enabled *bool
Cdrs_conns *[]string
Filters *[]string
Enabled *bool
Cdrs_conns *[]string
Thresholds_conns *[]string
Filters *[]string
}
// Cdrs config section

View File

@@ -240,7 +240,7 @@ func (l *LoaderSCfg) AsMapInterface(separator string) (initialMP map[string]inte
cacheSConns := make([]string, len(l.CacheSConns))
for i, item := range l.CacheSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches) {
cacheSConns[i] = strings.ReplaceAll(item, ":*caches", utils.EmptyString)
cacheSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaCaches)
} else {
cacheSConns[i] = item
}

View File

@@ -133,7 +133,7 @@ func (ra *RadiusAgentCfg) AsMapInterface(separator string) (initialMP map[string
sessionSConns := make([]string, len(ra.SessionSConns))
for i, item := range ra.SessionSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS) {
sessionSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaSessionS, utils.EmptyString)
sessionSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaSessionS)
} else {
sessionSConns[i] = item
}

View File

@@ -124,7 +124,7 @@ func (ralsCfg *RalsCfg) AsMapInterface() (initialMP map[string]interface{}) {
threSholds := make([]string, len(ralsCfg.ThresholdSConns))
for i, item := range ralsCfg.ThresholdSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds) {
threSholds[i] = strings.ReplaceAll(item, ":*thresholds", utils.EmptyString)
threSholds[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaThresholds)
} else {
threSholds[i] = item
}
@@ -135,7 +135,7 @@ func (ralsCfg *RalsCfg) AsMapInterface() (initialMP map[string]interface{}) {
statS := make([]string, len(ralsCfg.StatSConns))
for i, item := range ralsCfg.StatSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStatS) {
statS[i] = strings.ReplaceAll(item, ":*stats", utils.EmptyString)
statS[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaStatS)
} else {
statS[i] = item
}
@@ -157,7 +157,7 @@ func (ralsCfg *RalsCfg) AsMapInterface() (initialMP map[string]interface{}) {
cacheSConns := make([]string, len(ralsCfg.CacheSConns))
for i, item := range ralsCfg.CacheSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches) {
cacheSConns[i] = strings.ReplaceAll(item, ":*caches", utils.EmptyString)
cacheSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaCaches)
} else {
cacheSConns[i] = item
}

View File

@@ -99,7 +99,7 @@ func (rlcfg *ResourceSConfig) AsMapInterface() (initialMP map[string]interface{}
thresholdSConns := make([]string, len(rlcfg.ThresholdSConns))
for i, item := range rlcfg.ThresholdSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds) {
thresholdSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaThresholds, utils.EmptyString)
thresholdSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaThresholds)
} else {
thresholdSConns[i] = item
}

View File

@@ -155,7 +155,7 @@ func (rts *RouteSCfg) AsMapInterface() (initialMP map[string]interface{}) {
attributeSConns := make([]string, len(rts.AttributeSConns))
for i, item := range rts.AttributeSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes) {
attributeSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaAttributes, utils.EmptyString)
attributeSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaAttributes)
} else {
attributeSConns[i] = item
}
@@ -166,7 +166,7 @@ func (rts *RouteSCfg) AsMapInterface() (initialMP map[string]interface{}) {
ralSConns := make([]string, len(rts.RALsConns))
for i, item := range rts.RALsConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResponder) {
ralSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaResponder, utils.EmptyString)
ralSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaResponder)
} else {
ralSConns[i] = item
}
@@ -177,7 +177,7 @@ func (rts *RouteSCfg) AsMapInterface() (initialMP map[string]interface{}) {
resourceSConns := make([]string, len(rts.ResourceSConns))
for i, item := range rts.ResourceSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources) {
resourceSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaResources, utils.EmptyString)
resourceSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaResources)
} else {
resourceSConns[i] = item
}
@@ -188,7 +188,7 @@ func (rts *RouteSCfg) AsMapInterface() (initialMP map[string]interface{}) {
statSConns := make([]string, len(rts.StatSConns))
for i, item := range rts.StatSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStatS) {
statSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaStatS, utils.EmptyString)
statSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaStatS)
} else {
statSConns[i] = item
}

View File

@@ -25,12 +25,13 @@ import (
)
type SchedulerCfg struct {
Enabled bool
CDRsConns []string
Filters []string
Enabled bool
CDRsConns []string
ThreshSConns []string
Filters []string
}
func (schdcfg *SchedulerCfg) loadFromJsonCfg(jsnCfg *SchedulerJsonCfg) error {
func (schdcfg *SchedulerCfg) loadFromJSONCfg(jsnCfg *SchedulerJsonCfg) error {
if jsnCfg == nil {
return nil
}
@@ -54,6 +55,17 @@ func (schdcfg *SchedulerCfg) loadFromJsonCfg(jsnCfg *SchedulerJsonCfg) error {
schdcfg.Filters[i] = fltr
}
}
if jsnCfg.Thresholds_conns != nil {
schdcfg.ThreshSConns = make([]string, len(*jsnCfg.Thresholds_conns))
for idx, connID := range *jsnCfg.Thresholds_conns {
// if we have the connection internal we change the name so we can have internal rpc for each subsystem
if connID == utils.MetaInternal {
schdcfg.ThreshSConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds)
} else {
schdcfg.ThreshSConns[idx] = connID
}
}
}
return nil
}
@@ -66,12 +78,23 @@ func (schdcfg *SchedulerCfg) AsMapInterface() (initialMP map[string]interface{})
cdrsConns := make([]string, len(schdcfg.CDRsConns))
for i, item := range schdcfg.CDRsConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs) {
cdrsConns[i] = strings.ReplaceAll(item, ":*cdrs", utils.EmptyString)
cdrsConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaCDRs)
} else {
cdrsConns[i] = item
}
}
initialMP[utils.CDRsConnsCfg] = cdrsConns
}
if schdcfg.ThreshSConns != nil {
thrsConns := make([]string, len(schdcfg.ThreshSConns))
for i, item := range schdcfg.ThreshSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds) {
thrsConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaThresholds)
} else {
thrsConns[i] = item
}
}
initialMP[utils.ThreshSConnsCfg] = thrsConns
}
return
}

View File

@@ -26,18 +26,20 @@ import (
func TestSchedulerCfgloadFromJsonCfg(t *testing.T) {
cfgJSONS := &SchedulerJsonCfg{
Enabled: utils.BoolPointer(true),
Cdrs_conns: &[]string{utils.MetaInternal, "*conn1"},
Filters: &[]string{"randomFilter"},
Enabled: utils.BoolPointer(true),
Cdrs_conns: &[]string{utils.MetaInternal, "*conn1"},
Thresholds_conns: &[]string{utils.MetaInternal, "*conn1"},
Filters: &[]string{"randomFilter"},
}
expected := &SchedulerCfg{
Enabled: true,
CDRsConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs), "*conn1"},
Filters: []string{"randomFilter"},
Enabled: true,
CDRsConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs), "*conn1"},
ThreshSConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds), "*conn1"},
Filters: []string{"randomFilter"},
}
if jsonCfg, err := NewDefaultCGRConfig(); err != nil {
t.Error(err)
} else if err = jsonCfg.schedulerCfg.loadFromJsonCfg(cfgJSONS); err != nil {
} else if err = jsonCfg.schedulerCfg.loadFromJSONCfg(cfgJSONS); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expected, jsonCfg.schedulerCfg) {
t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(expected), utils.ToJSON(jsonCfg.schedulerCfg))
@@ -49,9 +51,10 @@ func TestSchedulerCfgAsMapInterface(t *testing.T) {
"schedulers": {},
}`
eMap := map[string]interface{}{
utils.EnabledCfg: false,
utils.CDRsConnsCfg: []string{},
utils.FiltersCfg: []string{},
utils.EnabledCfg: false,
utils.CDRsConnsCfg: []string{},
utils.ThreshSConnsCfg: []string{},
utils.FiltersCfg: []string{},
}
if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil {
t.Error(err)
@@ -65,14 +68,16 @@ func TestSchedulerCfgAsMapInterface1(t *testing.T) {
cfgJSONStr := `{
"schedulers": {
"enabled": true,
"cdrs_conns": ["*internal:*cdrs", "*conn1"],
"cdrs_conns": ["*internal", "*conn1"],
"thresholds_conns": ["*internal", "*conn1"],
"filters": ["randomFilter"],
},
}`
eMap := map[string]interface{}{
utils.EnabledCfg: true,
utils.CDRsConnsCfg: []string{utils.MetaInternal, "*conn1"},
utils.FiltersCfg: []string{"randomFilter"},
utils.EnabledCfg: true,
utils.CDRsConnsCfg: []string{utils.MetaInternal, "*conn1"},
utils.ThreshSConnsCfg: []string{utils.MetaInternal, "*conn1"},
utils.FiltersCfg: []string{"randomFilter"},
}
if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil {
t.Error(err)

View File

@@ -334,7 +334,7 @@ func (scfg *SessionSCfg) AsMapInterface() (initialMP map[string]interface{}) {
chargerSConns := make([]string, len(scfg.ChargerSConns))
for i, item := range scfg.ChargerSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers) {
chargerSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaChargers, utils.EmptyString)
chargerSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaChargers)
} else {
chargerSConns[i] = item
}
@@ -345,7 +345,7 @@ func (scfg *SessionSCfg) AsMapInterface() (initialMP map[string]interface{}) {
RALsConns := make([]string, len(scfg.RALsConns))
for i, item := range scfg.RALsConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResponder) {
RALsConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaResponder, utils.EmptyString)
RALsConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaResponder)
} else {
RALsConns[i] = item
}
@@ -357,7 +357,7 @@ func (scfg *SessionSCfg) AsMapInterface() (initialMP map[string]interface{}) {
for i, item := range scfg.ResSConns {
buf := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources)
if item == buf {
resSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaResources, utils.EmptyString)
resSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaResources)
} else {
resSConns[i] = item
}
@@ -369,7 +369,7 @@ func (scfg *SessionSCfg) AsMapInterface() (initialMP map[string]interface{}) {
for i, item := range scfg.ThreshSConns {
buf := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds)
if item == buf {
threshSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaThresholds, utils.EmptyString)
threshSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaThresholds)
} else {
threshSConns[i] = item
}
@@ -381,7 +381,7 @@ func (scfg *SessionSCfg) AsMapInterface() (initialMP map[string]interface{}) {
for i, item := range scfg.StatSConns {
buf := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStatS)
if item == buf {
statSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaStatS, utils.EmptyString)
statSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaStatS)
} else {
statSConns[i] = item
}
@@ -393,7 +393,7 @@ func (scfg *SessionSCfg) AsMapInterface() (initialMP map[string]interface{}) {
for i, item := range scfg.RouteSConns {
buf := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRoutes)
if item == buf {
routesConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaRoutes, utils.EmptyString)
routesConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaRoutes)
} else {
routesConns[i] = item
}
@@ -405,7 +405,7 @@ func (scfg *SessionSCfg) AsMapInterface() (initialMP map[string]interface{}) {
for i, item := range scfg.AttrSConns {
buf := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes)
if item == buf {
attrSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaAttributes, utils.EmptyString)
attrSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaAttributes)
} else {
attrSConns[i] = item
}
@@ -417,7 +417,7 @@ func (scfg *SessionSCfg) AsMapInterface() (initialMP map[string]interface{}) {
for i, item := range scfg.CDRsConns {
buf := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs)
if item == buf {
CDRsConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaCDRs, utils.EmptyString)
CDRsConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaCDRs)
} else {
CDRsConns[i] = item
}
@@ -429,7 +429,7 @@ func (scfg *SessionSCfg) AsMapInterface() (initialMP map[string]interface{}) {
for i, item := range scfg.SchedulerConns {
buf := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaScheduler)
if item == buf {
schedulerConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaScheduler, utils.EmptyString)
schedulerConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaScheduler)
} else {
schedulerConns[i] = item
}
@@ -521,7 +521,7 @@ func (fscfg *FsAgentCfg) AsMapInterface(separator string) (initialMP map[string]
for i, item := range fscfg.SessionSConns {
buf := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS)
if item == buf {
sessionSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaSessionS, utils.EmptyString)
sessionSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaSessionS)
} else {
sessionSConns[i] = item
}
@@ -669,7 +669,7 @@ func (aCfg *AsteriskAgentCfg) AsMapInterface() (initialMP map[string]interface{}
sessionSConns := make([]string, len(aCfg.SessionSConns))
for i, item := range aCfg.SessionSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS) {
sessionSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaSessionS, utils.EmptyString)
sessionSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaSessionS)
} else {
sessionSConns[i] = item
}

View File

@@ -119,7 +119,7 @@ func (da *SIPAgentCfg) AsMapInterface(separator string) (initialMP map[string]in
sessionSConns := make([]string, len(da.SessionSConns))
for i, item := range da.SessionSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS) {
sessionSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaSessionS, utils.EmptyString)
sessionSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaSessionS)
} else {
sessionSConns[i] = item
}

View File

@@ -133,7 +133,7 @@ func (st *StatSCfg) AsMapInterface() (initialMP map[string]interface{}) {
thresholdSConns := make([]string, len(st.ThresholdSConns))
for i, item := range st.ThresholdSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds) {
thresholdSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaThresholds, utils.EmptyString)
thresholdSConns[i] = strings.TrimSuffix(item, utils.CONCATENATED_KEY_SEP+utils.MetaThresholds)
} else {
thresholdSConns[i] = item
}

View File

@@ -22,6 +22,7 @@
"schedulers": {
"enabled": true,
"thresholds_conns": ["*localhost"],
},
"cdrs": {

View File

@@ -22,6 +22,7 @@
"schedulers": {
"enabled": true,
"thresholds_conns": ["*localhost"],
},
"cdrs": {

View File

@@ -31,6 +31,7 @@
"schedulers": {
"enabled": true,
"thresholds_conns": ["*localhost"],
},
"cdrs": {

View File

@@ -22,6 +22,7 @@
"schedulers": {
"enabled": true,
"thresholds_conns": ["*localhost"],
},
"cdrs": {

View File

@@ -30,6 +30,7 @@
"schedulers": {
"enabled": true,
"thresholds_conns": ["*localhost"],
},
"cdrs": {

View File

@@ -22,6 +22,7 @@
"schedulers": {
"enabled": true,
"thresholds_conns": ["*localhost"],
},
"cdrs": {

View File

@@ -104,6 +104,7 @@ func getActionFunc(typ string) (actionTypeFunc, bool) {
utils.MetaPostEvent: postEvent,
utils.MetaCDRAccount: resetAccountCDR,
utils.MetaExport: export,
utils.MetaResetThreshold: resetThreshold,
}
f, exists := actionFuncMap[typ]
return f, exists
@@ -1048,3 +1049,12 @@ func export(ub *Account, a *Action, acs Actions, extraData interface{}) (err err
return connMgr.Call(config.CgrConfig().ApierCfg().EEsConns, nil,
utils.EventExporterSv1ProcessEvent, args, &rply)
}
func resetThreshold(ub *Account, a *Action, acs Actions, extraData interface{}) (err error) {
args := &utils.TenantIDWithOpts{
TenantID: utils.NewTenantID(a.ExtraParameters),
}
var rply string
return connMgr.Call(config.CgrConfig().SchedulerCfg().ThreshSConns, nil,
utils.ThresholdSv1ResetThreshold, args, &rply)
}

View File

@@ -481,3 +481,31 @@ func (tS *ThresholdService) Reload() {
func (tS *ThresholdService) StartLoop() {
go tS.runBackup()
}
// V1ResetThreshold resets the threshold hits
func (tS *ThresholdService) V1ResetThreshold(tntID *utils.TenantID, rply *string) (err error) {
var thd *Threshold
tnt := tntID.Tenant
if tnt == utils.EmptyString {
tnt = tS.cgrcfg.GeneralCfg().DefaultTenant
}
if thd, err = tS.dm.GetThreshold(tnt, tntID.ID, true, true, ""); err != nil {
return
}
if thd.Hits != 0 {
thd.Hits = 0
thd.Snooze = time.Time{}
thd.dirty = utils.BoolPointer(true) // mark it to be saved
if tS.cgrcfg.ThresholdSCfg().StoreInterval == -1 {
if err = tS.StoreThreshold(thd); err != nil {
return
}
} else {
tS.stMux.Lock()
tS.storedTdIDs[thd.TenantID()] = true
tS.stMux.Unlock()
}
}
*rply = utils.OK
return
}

View File

@@ -998,6 +998,7 @@ const (
MetaRemoveExpired = "*remove_expired"
MetaPostEvent = "*post_event"
MetaCDRAccount = "*reset_account_cdr"
MetaResetThreshold = "*reset_threshold"
ActionID = "ActionID"
ActionType = "ActionType"
ActionValue = "ActionValue"
@@ -1489,6 +1490,7 @@ const (
const (
ThresholdSv1ProcessEvent = "ThresholdSv1.ProcessEvent"
ThresholdSv1GetThreshold = "ThresholdSv1.GetThreshold"
ThresholdSv1ResetThreshold = "ThresholdSv1.ResetThreshold"
ThresholdSv1GetThresholdIDs = "ThresholdSv1.GetThresholdIDs"
ThresholdSv1Ping = "ThresholdSv1.Ping"
ThresholdSv1GetThresholdsForEvent = "ThresholdSv1.GetThresholdsForEvent"