mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Adding StatsQueueTriggered to enhance the information displayed in triggered actions for StatsQueue
This commit is contained in:
@@ -67,7 +67,7 @@ const (
|
||||
UNLIMITED = "*unlimited"
|
||||
)
|
||||
|
||||
type actionTypeFunc func(*Account, *StatsQueue, *Action) error
|
||||
type actionTypeFunc func(*Account, *StatsQueueTriggered, *Action) error
|
||||
|
||||
func getActionFunc(typ string) (actionTypeFunc, bool) {
|
||||
switch typ {
|
||||
@@ -111,18 +111,19 @@ func getActionFunc(typ string) (actionTypeFunc, bool) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func logAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
func logAction(ub *Account, sq *StatsQueueTriggered, a *Action) (err error) {
|
||||
if ub != nil {
|
||||
body, _ := json.Marshal(ub)
|
||||
Logger.Info(fmt.Sprintf("Threshold hit, Balance: %s", body))
|
||||
}
|
||||
if sq != nil {
|
||||
Logger.Info(fmt.Sprintf("Threshold hit, StatsQueueId: %s, Metrics: %+v", sq.GetId(), sq.getStats()))
|
||||
body, _ := json.Marshal(sq)
|
||||
Logger.Info(fmt.Sprintf("Threshold hit, StatsQueue: %s", body))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func resetTriggersAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
func resetTriggersAction(ub *Account, sq *StatsQueueTriggered, a *Action) (err error) {
|
||||
if ub == nil {
|
||||
return errors.New("Nil user balance")
|
||||
}
|
||||
@@ -130,7 +131,7 @@ func resetTriggersAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func setRecurrentAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
func setRecurrentAction(ub *Account, sq *StatsQueueTriggered, a *Action) (err error) {
|
||||
if ub == nil {
|
||||
return errors.New("Nil user balance")
|
||||
}
|
||||
@@ -138,7 +139,7 @@ func setRecurrentAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func unsetRecurrentAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
func unsetRecurrentAction(ub *Account, sq *StatsQueueTriggered, a *Action) (err error) {
|
||||
if ub == nil {
|
||||
return errors.New("Nil user balance")
|
||||
}
|
||||
@@ -146,7 +147,7 @@ func unsetRecurrentAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func allowNegativeAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
func allowNegativeAction(ub *Account, sq *StatsQueueTriggered, a *Action) (err error) {
|
||||
if ub == nil {
|
||||
return errors.New("Nil user balance")
|
||||
}
|
||||
@@ -154,7 +155,7 @@ func allowNegativeAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func denyNegativeAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
func denyNegativeAction(ub *Account, sq *StatsQueueTriggered, a *Action) (err error) {
|
||||
if ub == nil {
|
||||
return errors.New("Nil user balance")
|
||||
}
|
||||
@@ -162,14 +163,14 @@ func denyNegativeAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func resetAccountAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
func resetAccountAction(ub *Account, sq *StatsQueueTriggered, a *Action) (err error) {
|
||||
if ub == nil {
|
||||
return errors.New("Nil user balance")
|
||||
}
|
||||
return genericReset(ub)
|
||||
}
|
||||
|
||||
func topupResetAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
func topupResetAction(ub *Account, sq *StatsQueueTriggered, a *Action) (err error) {
|
||||
if ub == nil {
|
||||
return errors.New("Nil user balance")
|
||||
}
|
||||
@@ -181,7 +182,7 @@ func topupResetAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
return genericDebit(ub, a)
|
||||
}
|
||||
|
||||
func topupAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
func topupAction(ub *Account, sq *StatsQueueTriggered, a *Action) (err error) {
|
||||
if ub == nil {
|
||||
return errors.New("Nil user balance")
|
||||
}
|
||||
@@ -189,7 +190,7 @@ func topupAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
return genericDebit(ub, a)
|
||||
}
|
||||
|
||||
func debitResetAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
func debitResetAction(ub *Account, sq *StatsQueueTriggered, a *Action) (err error) {
|
||||
if ub == nil {
|
||||
return errors.New("Nil user balance")
|
||||
}
|
||||
@@ -200,14 +201,14 @@ func debitResetAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
return genericDebit(ub, a)
|
||||
}
|
||||
|
||||
func debitAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
func debitAction(ub *Account, sq *StatsQueueTriggered, a *Action) (err error) {
|
||||
if ub == nil {
|
||||
return errors.New("Nil user balance")
|
||||
}
|
||||
return genericDebit(ub, a)
|
||||
}
|
||||
|
||||
func resetCounterAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
func resetCounterAction(ub *Account, sq *StatsQueueTriggered, a *Action) (err error) {
|
||||
if ub == nil {
|
||||
return errors.New("Nil user balance")
|
||||
}
|
||||
@@ -220,7 +221,7 @@ func resetCounterAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func resetCountersAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
func resetCountersAction(ub *Account, sq *StatsQueueTriggered, a *Action) (err error) {
|
||||
if ub == nil {
|
||||
return errors.New("Nil user balance")
|
||||
}
|
||||
@@ -246,7 +247,7 @@ func genericDebit(ub *Account, a *Action) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func enableUserAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
func enableUserAction(ub *Account, sq *StatsQueueTriggered, a *Action) (err error) {
|
||||
if ub == nil {
|
||||
return errors.New("Nil user balance")
|
||||
}
|
||||
@@ -254,7 +255,7 @@ func enableUserAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func disableUserAction(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
func disableUserAction(ub *Account, sq *StatsQueueTriggered, a *Action) (err error) {
|
||||
if ub == nil {
|
||||
return errors.New("Nil user balance")
|
||||
}
|
||||
@@ -271,25 +272,31 @@ func genericReset(ub *Account) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func callUrl(ub *Account, sq *StatsQueue, a *Action) (err error) {
|
||||
cfg := config.CgrConfig()
|
||||
if ub != nil {
|
||||
_, err = utils.HttpJsonPost(a.ExtraParameters, cfg.HttpSkipTlsVerify, ub)
|
||||
}
|
||||
if sq != nil {
|
||||
_, err = utils.HttpJsonPost(a.ExtraParameters, cfg.HttpSkipTlsVerify, sq)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Does not block for posts, no error reports
|
||||
func callUrlAsync(ub *Account, sq *StatsQueue, a *Action) error {
|
||||
func callUrl(ub *Account, sq *StatsQueueTriggered, a *Action) (err error) {
|
||||
var o interface{}
|
||||
if ub != nil {
|
||||
o = ub
|
||||
}
|
||||
if sq != nil {
|
||||
o = sq.GetId()
|
||||
o = sq
|
||||
}
|
||||
jsn, err := json.Marshal(o)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg := config.CgrConfig()
|
||||
_, err = utils.HttpJsonPost(a.ExtraParameters, cfg.HttpSkipTlsVerify, jsn)
|
||||
return err
|
||||
}
|
||||
|
||||
// Does not block for posts, no error reports
|
||||
func callUrlAsync(ub *Account, sq *StatsQueueTriggered, a *Action) error {
|
||||
var o interface{}
|
||||
if ub != nil {
|
||||
o = ub
|
||||
}
|
||||
if sq != nil {
|
||||
o = sq
|
||||
}
|
||||
jsn, err := json.Marshal(o)
|
||||
if err != nil {
|
||||
@@ -301,7 +308,7 @@ func callUrlAsync(ub *Account, sq *StatsQueue, a *Action) error {
|
||||
if _, err = utils.HttpJsonPost(a.ExtraParameters, cfg.HttpSkipTlsVerify, o); err == nil {
|
||||
break // Success, no need to reinterate
|
||||
} else if i == 4 { // Last iteration, syslog the warning
|
||||
Logger.Warning(fmt.Sprintf("<Triggers> WARNING: Failed calling url: [%s], error: [%s], balance: %s", a.ExtraParameters, err.Error(), jsn))
|
||||
Logger.Warning(fmt.Sprintf("<Triggers> WARNING: Failed calling url: [%s], error: [%s], triggered: %s", a.ExtraParameters, err.Error(), jsn))
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Duration(i) * time.Minute)
|
||||
@@ -312,7 +319,7 @@ func callUrlAsync(ub *Account, sq *StatsQueue, a *Action) error {
|
||||
}
|
||||
|
||||
// Mails the balance hitting the threshold towards predefined list of addresses
|
||||
func mailAsync(ub *Account, sq *StatsQueue, a *Action) error {
|
||||
func mailAsync(ub *Account, sq *StatsQueueTriggered, a *Action) error {
|
||||
cgrCfg := config.CgrConfig()
|
||||
params := strings.Split(a.ExtraParameters, string(utils.CSV_SEP))
|
||||
if len(params) == 0 {
|
||||
@@ -334,7 +341,8 @@ func mailAsync(ub *Account, sq *StatsQueue, a *Action) error {
|
||||
}
|
||||
message = []byte(fmt.Sprintf("To: %s\r\nSubject: [CGR Notification] Threshold hit on Balance: %s\r\n\r\nTime: \r\n\t%s\r\n\r\nBalance:\r\n\t%s\r\n\r\nYours faithfully,\r\nCGR Balance Monitor\r\n", toAddrStr, ub.Id, time.Now(), balJsn))
|
||||
} else if sq != nil {
|
||||
message = []byte(fmt.Sprintf("To: %s\r\nSubject: [CGR Notification] Threshold hit on StatsQueueId: %s\r\n\r\nTime: \r\n\t%s\r\n\r\nStatsQueueId:\r\n\t%s\r\n\r\nMetrics:\r\n\t%+v\r\n\r\nYours faithfully,\r\nCGR CDR Stats Monitor\r\n", toAddrStr, sq.GetId(), time.Now(), sq.GetId(), sq.getStats()))
|
||||
message = []byte(fmt.Sprintf("To: %s\r\nSubject: [CGR Notification] Threshold hit on StatsQueueId: %s\r\n\r\nTime: \r\n\t%s\r\n\r\nStatsQueueId:\r\n\t%s\r\n\r\nMetrics:\r\n\t%+v\r\n\r\nTrigger:\r\n\t%+v\r\n\r\nYours faithfully,\r\nCGR CDR Stats Monitor\r\n",
|
||||
toAddrStr, sq.Id, time.Now(), sq.Id, sq.Metrics, sq.Trigger))
|
||||
}
|
||||
auth := smtp.PlainAuth("", cgrCfg.MailerAuthUser, cgrCfg.MailerAuthPass, strings.Split(cgrCfg.MailerServer, ":")[0]) // We only need host part, so ignore port
|
||||
go func() {
|
||||
@@ -345,7 +353,7 @@ func mailAsync(ub *Account, sq *StatsQueue, a *Action) error {
|
||||
if ub != nil {
|
||||
Logger.Warning(fmt.Sprintf("<Triggers> WARNING: Failed emailing, params: [%s], error: [%s], BalanceId: %s", a.ExtraParameters, err.Error(), ub.Id))
|
||||
} else if sq != nil {
|
||||
Logger.Warning(fmt.Sprintf("<Triggers> WARNING: Failed emailing, params: [%s], error: [%s], StatsQueueId: %s", a.ExtraParameters, err.Error(), sq.GetId()))
|
||||
Logger.Warning(fmt.Sprintf("<Triggers> WARNING: Failed emailing, params: [%s], error: [%s], StatsQueueTriggeredId: %s", a.ExtraParameters, err.Error(), sq.Id))
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
@@ -48,7 +48,7 @@ type ActionTrigger struct {
|
||||
lastExecutionTime time.Time
|
||||
}
|
||||
|
||||
func (at *ActionTrigger) Execute(ub *Account, sq *StatsQueue) (err error) {
|
||||
func (at *ActionTrigger) Execute(ub *Account, sq *StatsQueueTriggered) (err error) {
|
||||
// check for min sleep time
|
||||
if at.Recurrent && !at.lastExecutionTime.IsZero() && time.Since(at.lastExecutionTime) < at.MinSleep {
|
||||
return
|
||||
|
||||
@@ -90,14 +90,14 @@ func (sq *StatsQueue) AppendCDR(cdr *utils.StoredCdr) {
|
||||
if strings.HasPrefix(at.ThresholdType, "*min_") {
|
||||
if value, ok := stats[METRIC_TRIGGER_MAP[at.ThresholdType]]; ok {
|
||||
if value <= at.ThresholdValue {
|
||||
at.Execute(nil, sq)
|
||||
at.Execute(nil, sq.Triggered(at))
|
||||
}
|
||||
}
|
||||
}
|
||||
if strings.HasPrefix(at.ThresholdType, "*max_") {
|
||||
if value, ok := stats[METRIC_TRIGGER_MAP[at.ThresholdType]]; ok {
|
||||
if value >= at.ThresholdValue {
|
||||
at.Execute(nil, sq)
|
||||
at.Execute(nil, sq.Triggered(at))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -169,3 +169,15 @@ func (sq *StatsQueue) getStats() map[string]float64 {
|
||||
func (sq *StatsQueue) GetId() string {
|
||||
return sq.conf.Id
|
||||
}
|
||||
|
||||
// Convert data into a struct which can be used in actions based on triggers hit
|
||||
func (sq *StatsQueue) Triggered(at *ActionTrigger) *StatsQueueTriggered {
|
||||
return &StatsQueueTriggered{Id: sq.conf.Id, Metrics: sq.getStats(), Trigger: at}
|
||||
}
|
||||
|
||||
// Struct to be passed to triggered actions
|
||||
type StatsQueueTriggered struct {
|
||||
Id string // StatsQueueId
|
||||
Metrics map[string]float64
|
||||
Trigger *ActionTrigger
|
||||
}
|
||||
|
||||
@@ -5,10 +5,9 @@
|
||||
export DH_VERBOSE=1
|
||||
|
||||
export GOPATH=$(CURDIR)
|
||||
#export GOPATH=/usr/local/goapps
|
||||
|
||||
PKGDIR=debian/cgrates
|
||||
SRCDIR=/usr/local/src/cgrates/cgrates.git/cgrates
|
||||
SRCDIR=src/github.com/cgrates/cgrates
|
||||
|
||||
%:
|
||||
dh $@
|
||||
@@ -30,10 +29,10 @@ binary-arch: clean
|
||||
cp $(SRCDIR)/data/conf/cgrates.cfg $(PKGDIR)/etc/cgrates/
|
||||
mkdir -p $(PKGDIR)/usr/share/cgrates
|
||||
cp -r $(SRCDIR)/data/* $(PKGDIR)/usr/share/cgrates/
|
||||
mkdir -p $(PKGDIR)/var/log/cgrates/cdrc/in
|
||||
mkdir -p $(PKGDIR)/var/log/cgrates/cdrc/out
|
||||
mkdir -p $(PKGDIR)/var/log/cgrates/cdre/csv
|
||||
mkdir -p $(PKGDIR)/var/log/cgrates/cdre/fwv
|
||||
mkdir -p $(PKGDIR)/var/log/cgrates/cdr/cdrc/in
|
||||
mkdir -p $(PKGDIR)/var/log/cgrates/cdr/cdrc/out
|
||||
mkdir -p $(PKGDIR)/var/log/cgrates/cdr/cdre/csv
|
||||
mkdir -p $(PKGDIR)/var/log/cgrates/cdr/cdre/fwv
|
||||
mkdir -p $(PKGDIR)/var/log/cgrates/history
|
||||
dh_strip
|
||||
dh_compress
|
||||
|
||||
Reference in New Issue
Block a user