Files
cgrates/engine/action.go
2016-01-25 10:16:22 +02:00

643 lines
19 KiB
Go

/*
Real-time Charging System for Telecom & ISP environments
Copyright (C) 2012-2015 ITsysCOM GmbH
This program is free software: you can Storagetribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITH*out ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"encoding/json"
"errors"
"fmt"
"net/smtp"
"path"
"reflect"
"sort"
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
/*
Structure to be filled for each tariff plan with the bonus value for received calls minutes.
*/
type Action struct {
Id string
ActionType string
BalanceType string
ExtraParameters string
Filter string
ExpirationString string // must stay as string because it can have relative values like 1month
Weight float64
Balance *Balance
}
const (
LOG = "*log"
RESET_TRIGGERS = "*reset_triggers"
SET_RECURRENT = "*set_recurrent"
UNSET_RECURRENT = "*unset_recurrent"
ALLOW_NEGATIVE = "*allow_negative"
DENY_NEGATIVE = "*deny_negative"
RESET_ACCOUNT = "*reset_account"
REMOVE_ACCOUNT = "*remove_account"
SET_BALANCE = "*set_balance"
REMOVE_BALANCE = "*remove_balance"
TOPUP_RESET = "*topup_reset"
TOPUP = "*topup"
DEBIT_RESET = "*debit_reset"
DEBIT = "*debit"
RESET_COUNTERS = "*reset_counters"
ENABLE_ACCOUNT = "*enable_account"
DISABLE_ACCOUNT = "*disable_account"
ENABLE_DISABLE_BALANCE = "*enable_disable_balance"
CALL_URL = "*call_url"
CALL_URL_ASYNC = "*call_url_async"
MAIL_ASYNC = "*mail_async"
UNLIMITED = "*unlimited"
CDRLOG = "*cdrlog"
SET_DDESTINATIONS = "*set_ddestinations"
TRANSFER_MONETARY_DEFAULT = "*transfer_monetary_default"
)
func (a *Action) Clone() *Action {
return &Action{
Id: a.Id,
ActionType: a.ActionType,
BalanceType: a.BalanceType,
ExtraParameters: a.ExtraParameters,
ExpirationString: a.ExpirationString,
Weight: a.Weight,
Balance: a.Balance.Clone(),
}
}
type actionTypeFunc func(*Account, *StatsQueueTriggered, *Action, Actions) error
func getActionFunc(typ string) (actionTypeFunc, bool) {
switch typ {
case LOG:
return logAction, true
case CDRLOG:
return cdrLogAction, true
case RESET_TRIGGERS:
return resetTriggersAction, true
case SET_RECURRENT:
return setRecurrentAction, true
case UNSET_RECURRENT:
return unsetRecurrentAction, true
case ALLOW_NEGATIVE:
return allowNegativeAction, true
case DENY_NEGATIVE:
return denyNegativeAction, true
case RESET_ACCOUNT:
return resetAccountAction, true
case TOPUP_RESET:
return topupResetAction, true
case TOPUP:
return topupAction, true
case DEBIT_RESET:
return debitResetAction, true
case DEBIT:
return debitAction, true
case RESET_COUNTERS:
return resetCountersAction, true
case ENABLE_ACCOUNT:
return enableUserAction, true
case DISABLE_ACCOUNT:
return disableUserAction, true
case ENABLE_DISABLE_BALANCE:
return enableDisableBalanceAction, true
case CALL_URL:
return callUrl, true
case CALL_URL_ASYNC:
return callUrlAsync, true
case MAIL_ASYNC:
return mailAsync, true
case SET_DDESTINATIONS:
return setddestinations, true
case REMOVE_ACCOUNT:
return removeAccountAction, true
case REMOVE_BALANCE:
return removeBalanceAction, true
case SET_BALANCE:
return setBalanceAction, true
case TRANSFER_MONETARY_DEFAULT:
return transferMonetaryDefaultAction, true
}
return nil, false
}
func logAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
if ub != nil {
body, _ := json.Marshal(ub)
utils.Logger.Info(fmt.Sprintf("Threshold hit, Balance: %s", body))
}
if sq != nil {
body, _ := json.Marshal(sq)
utils.Logger.Info(fmt.Sprintf("Threshold hit, StatsQueue: %s", body))
}
return
}
// Used by cdrLogAction to dynamically parse values out of account and action
func parseTemplateValue(rsrFlds utils.RSRFields, acnt *Account, action *Action) string {
dta, err := utils.NewTAFromAccountKey(acnt.Id) // Account information should be valid
if err != nil {
dta = new(utils.TenantAccount) // Init with empty values
}
var parsedValue string // Template values
for _, rsrFld := range rsrFlds {
switch rsrFld.Id {
case "AccountID":
parsedValue += rsrFld.ParseValue(acnt.Id)
case "Directions":
parsedValue += rsrFld.ParseValue(action.Balance.Directions.String())
case utils.TENANT:
parsedValue += rsrFld.ParseValue(dta.Tenant)
case utils.ACCOUNT:
parsedValue += rsrFld.ParseValue(dta.Account)
case "ActionID":
parsedValue += rsrFld.ParseValue(action.Id)
case "ActionType":
parsedValue += rsrFld.ParseValue(action.ActionType)
case "BalanceType":
parsedValue += rsrFld.ParseValue(action.BalanceType)
case "BalanceUUID":
parsedValue += rsrFld.ParseValue(action.Balance.Uuid)
case "BalanceID":
parsedValue += rsrFld.ParseValue(action.Balance.Id)
case "BalanceValue":
parsedValue += rsrFld.ParseValue(strconv.FormatFloat(action.Balance.GetValue(), 'f', -1, 64))
case "DestinationIDs":
parsedValue += rsrFld.ParseValue(action.Balance.DestinationIds.String())
case "ExtraParameters":
parsedValue += rsrFld.ParseValue(action.ExtraParameters)
case "RatingSubject":
parsedValue += rsrFld.ParseValue(action.Balance.RatingSubject)
case utils.CATEGORY:
parsedValue += rsrFld.ParseValue(action.Balance.Categories.String())
case "SharedGroups":
parsedValue += rsrFld.ParseValue(action.Balance.SharedGroups.String())
default:
parsedValue += rsrFld.ParseValue("") // Mostly for static values
}
}
return parsedValue
}
func cdrLogAction(acc *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
defaultTemplate := map[string]utils.RSRFields{
utils.TOR: utils.ParseRSRFieldsMustCompile("BalanceType", utils.INFIELD_SEP),
utils.CDRHOST: utils.ParseRSRFieldsMustCompile("^127.0.0.1", utils.INFIELD_SEP),
utils.DIRECTION: utils.ParseRSRFieldsMustCompile("Directions", utils.INFIELD_SEP),
utils.REQTYPE: utils.ParseRSRFieldsMustCompile("^"+utils.META_PREPAID, utils.INFIELD_SEP),
utils.TENANT: utils.ParseRSRFieldsMustCompile(utils.TENANT, utils.INFIELD_SEP),
utils.ACCOUNT: utils.ParseRSRFieldsMustCompile(utils.ACCOUNT, utils.INFIELD_SEP),
utils.SUBJECT: utils.ParseRSRFieldsMustCompile(utils.ACCOUNT, utils.INFIELD_SEP),
utils.COST: utils.ParseRSRFieldsMustCompile("BalanceValue", utils.INFIELD_SEP),
}
template := make(map[string]string)
// overwrite default template
if a.ExtraParameters != "" {
if err = json.Unmarshal([]byte(a.ExtraParameters), &template); err != nil {
return
}
for field, rsr := range template {
defaultTemplate[field], err = utils.ParseRSRFields(rsr, utils.INFIELD_SEP)
if err != nil {
return err
}
}
}
// set stored cdr values
var cdrs []*CDR
for _, action := range acs {
if !utils.IsSliceMember([]string{DEBIT, DEBIT_RESET, TOPUP, TOPUP_RESET}, action.ActionType) || action.Balance == nil {
continue // Only log specific actions
}
cdr := &CDR{RunID: action.ActionType, Source: CDRLOG, SetupTime: time.Now(), AnswerTime: time.Now(), OriginID: utils.GenUUID(), ExtraFields: make(map[string]string)}
cdr.CGRID = utils.Sha1(cdr.OriginID, cdr.SetupTime.String())
cdr.Usage = time.Duration(1) * time.Second
elem := reflect.ValueOf(cdr).Elem()
for key, rsrFlds := range defaultTemplate {
parsedValue := parseTemplateValue(rsrFlds, acc, action)
field := elem.FieldByName(key)
if field.IsValid() && field.CanSet() {
switch field.Kind() {
case reflect.Float64:
value, err := strconv.ParseFloat(parsedValue, 64)
if err != nil {
continue
}
field.SetFloat(value)
case reflect.String:
field.SetString(parsedValue)
}
} else { // invalid fields go in extraFields of CDR
cdr.ExtraFields[key] = parsedValue
}
}
cdrs = append(cdrs, cdr)
if cdrStorage == nil { // Only save if the cdrStorage is defined
continue
}
if err := cdrStorage.SetCDR(cdr, true); err != nil {
return err
}
}
b, _ := json.Marshal(cdrs)
a.ExpirationString = string(b) // testing purpose only
return
}
func resetTriggersAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
if ub == nil {
return errors.New("nil user balance")
}
ub.ResetActionTriggers(a)
return
}
func setRecurrentAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
if ub == nil {
return errors.New("nil user balance")
}
ub.SetRecurrent(a, true)
return
}
func unsetRecurrentAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
if ub == nil {
return errors.New("nil user balance")
}
ub.SetRecurrent(a, false)
return
}
func allowNegativeAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
if ub == nil {
return errors.New("nil user balance")
}
ub.AllowNegative = true
return
}
func denyNegativeAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
if ub == nil {
return errors.New("nil user balance")
}
ub.AllowNegative = false
return
}
func resetAccountAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
if ub == nil {
return errors.New("nil user balance")
}
return genericReset(ub)
}
func topupResetAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
if ub == nil {
return errors.New("nil user balance")
}
if ub.BalanceMap == nil { // Init the map since otherwise will get error if nil
ub.BalanceMap = make(map[string]BalanceChain, 0)
}
c := a.Clone()
genericMakeNegative(c)
return genericDebit(ub, c, true)
}
func topupAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
if ub == nil {
return errors.New("nil user balance")
}
c := a.Clone()
genericMakeNegative(c)
return genericDebit(ub, c, false)
}
func debitResetAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
if ub == nil {
return errors.New("nil user balance")
}
if ub.BalanceMap == nil { // Init the map since otherwise will get error if nil
ub.BalanceMap = make(map[string]BalanceChain, 0)
}
return genericDebit(ub, a, true)
}
func debitAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
if ub == nil {
return errors.New("nil user balance")
}
err = genericDebit(ub, a, false)
return
}
func resetCountersAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
if ub == nil {
return errors.New("nil user balance")
}
if ub.UnitCounters != nil {
ub.UnitCounters.resetCounters(a)
}
return
}
func genericMakeNegative(a *Action) {
if a.Balance != nil && a.Balance.GetValue() >= 0 { // only apply if not allready negative
a.Balance.SetValue(-a.Balance.GetValue())
}
}
func genericDebit(ub *Account, a *Action, reset bool) (err error) {
if ub == nil {
return errors.New("nil user balance")
}
if ub.BalanceMap == nil {
ub.BalanceMap = make(map[string]BalanceChain)
}
return ub.debitBalanceAction(a, reset)
}
func enableUserAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
if ub == nil {
return errors.New("nil user balance")
}
ub.Disabled = false
return
}
func disableUserAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
if ub == nil {
return errors.New("nil user balance")
}
ub.Disabled = true
return
}
func enableDisableBalanceAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
if ub == nil {
return errors.New("nil user balance")
}
ub.enableDisableBalanceAction(a)
return
}
func genericReset(ub *Account) error {
for k, _ := range ub.BalanceMap {
ub.BalanceMap[k] = BalanceChain{&Balance{Value: 0}}
}
ub.InitCounters()
ub.ResetActionTriggers(nil)
return nil
}
func callUrl(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error {
var o interface{}
if ub != nil {
o = ub
}
if sq != nil {
o = sq
}
cfg := config.CgrConfig()
fallbackPath := path.Join(cfg.HttpFailedDir, fmt.Sprintf("act_%s_%s_%s.json", a.ActionType, a.ExtraParameters, utils.GenUUID()))
_, err := utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, o, utils.CONTENT_JSON, 1, fallbackPath)
return err
}
// Does not block for posts, no error reports
func callUrlAsync(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error {
var o interface{}
if ub != nil {
o = ub
}
if sq != nil {
o = sq
}
cfg := config.CgrConfig()
fallbackPath := path.Join(cfg.HttpFailedDir, fmt.Sprintf("act_%s_%s_%s.json", a.ActionType, a.ExtraParameters, utils.GenUUID()))
go utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, o, utils.CONTENT_JSON, 3, fallbackPath)
return nil
}
// Mails the balance hitting the threshold towards predefined list of addresses
func mailAsync(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error {
cgrCfg := config.CgrConfig()
params := strings.Split(a.ExtraParameters, string(utils.CSV_SEP))
if len(params) == 0 {
return errors.New("Unconfigured parameters for mail action")
}
toAddrs := strings.Split(params[0], string(utils.FALLBACK_SEP))
toAddrStr := ""
for idx, addr := range toAddrs {
if idx != 0 {
toAddrStr += ", "
}
toAddrStr += addr
}
var message []byte
if ub != nil {
balJsn, err := json.Marshal(ub)
if err != nil {
return err
}
message = []byte(fmt.Sprintf("To: %s\r\nSubject: [CGR Notification] Threshold hit on Balance: %s\r\n\r\nTime: \r\n\t%s\r\n\r\nBalance:\r\n\t%s\r\n\r\nYours faithfully,\r\nCGR Balance Monitor\r\n", toAddrStr, ub.Id, time.Now(), balJsn))
} else if sq != nil {
message = []byte(fmt.Sprintf("To: %s\r\nSubject: [CGR Notification] Threshold hit on StatsQueueId: %s\r\n\r\nTime: \r\n\t%s\r\n\r\nStatsQueueId:\r\n\t%s\r\n\r\nMetrics:\r\n\t%+v\r\n\r\nTrigger:\r\n\t%+v\r\n\r\nYours faithfully,\r\nCGR CDR Stats Monitor\r\n",
toAddrStr, sq.Id, time.Now(), sq.Id, sq.Metrics, sq.Trigger))
}
auth := smtp.PlainAuth("", cgrCfg.MailerAuthUser, cgrCfg.MailerAuthPass, strings.Split(cgrCfg.MailerServer, ":")[0]) // We only need host part, so ignore port
go func() {
for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort
if err := smtp.SendMail(cgrCfg.MailerServer, auth, cgrCfg.MailerFromAddr, toAddrs, message); err == nil {
break
} else if i == 4 {
if ub != nil {
utils.Logger.Warning(fmt.Sprintf("<Triggers> WARNING: Failed emailing, params: [%s], error: [%s], BalanceId: %s", a.ExtraParameters, err.Error(), ub.Id))
} else if sq != nil {
utils.Logger.Warning(fmt.Sprintf("<Triggers> WARNING: Failed emailing, params: [%s], error: [%s], StatsQueueTriggeredId: %s", a.ExtraParameters, err.Error(), sq.Id))
}
break
}
time.Sleep(time.Duration(i) * time.Minute)
}
}()
return nil
}
func setddestinations(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error {
var ddcDestId string
for _, bchain := range ub.BalanceMap {
for _, b := range bchain {
for destId := range b.DestinationIds {
if strings.HasPrefix(destId, "*ddc") {
ddcDestId = destId
break
}
}
if ddcDestId != "" {
break
}
}
if ddcDestId != "" {
break
}
}
if ddcDestId != "" {
// make slice from prefixes
prefixes := make([]string, len(sq.Metrics))
i := 0
for p := range sq.Metrics {
prefixes[i] = p
i++
}
// update destid in storage
ratingStorage.SetDestination(&Destination{Id: ddcDestId, Prefixes: prefixes})
// remove existing from cache
CleanStalePrefixes([]string{ddcDestId})
// update new values from redis
ratingStorage.CacheRatingPrefixValues(map[string][]string{utils.DESTINATION_PREFIX: []string{utils.DESTINATION_PREFIX + ddcDestId}})
} else {
return utils.ErrNotFound
}
return nil
}
func removeAccountAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error {
var accID string
if ub != nil {
accID = ub.Id
} else {
accountInfo := struct {
Tenant string
Account string
}{}
if a.ExtraParameters != "" {
if err := json.Unmarshal([]byte(a.ExtraParameters), &accountInfo); err != nil {
return err
}
}
accID = utils.AccountKey(accountInfo.Tenant, accountInfo.Account)
}
if accID == "" {
return utils.ErrInvalidKey
}
if err := accountingStorage.RemoveAccount(accID); err != nil {
utils.Logger.Err(fmt.Sprintf("Could not remove account Id: %s: %v", accID, err))
return err
}
// clean the account id from all action plans
allAPs, err := ratingStorage.GetAllActionPlans()
if err != nil && err != utils.ErrNotFound {
utils.Logger.Err(fmt.Sprintf("Could not get action plans: %s: %v", accID, err))
return err
}
for key, ap := range allAPs {
if _, exists := ap.AccountIDs[accID]; !exists {
_, err := Guardian.Guard(func() (interface{}, error) {
// save action plan
ratingStorage.SetActionPlan(key, ap)
// cache
ratingStorage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}})
return 0, nil
}, 0, utils.ACTION_PLAN_PREFIX)
if err != nil {
return err
}
}
}
return nil
}
func removeBalanceAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error {
if _, exists := ub.BalanceMap[a.BalanceType]; !exists {
return utils.ErrNotFound
}
bChain := ub.BalanceMap[a.BalanceType]
found := false
for i := 0; i < len(bChain); i++ {
if bChain[i].MatchFilter(a.Balance, false) {
// delete without preserving order
bChain[i] = bChain[len(bChain)-1]
bChain = bChain[:len(bChain)-1]
i -= 1
found = true
}
}
ub.BalanceMap[a.BalanceType] = bChain
if !found {
return utils.ErrNotFound
}
return nil
}
func setBalanceAction(acc *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error {
return acc.setBalanceAction(a)
}
func transferMonetaryDefaultAction(acc *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error {
if acc == nil {
utils.Logger.Err("*transfer_monetary_default called without account")
return utils.ErrAccountNotFound
}
if _, exists := acc.BalanceMap[utils.MONETARY]; !exists {
return utils.ErrNotFound
}
defaultBalance := acc.GetDefaultMoneyBalance()
bChain := acc.BalanceMap[utils.MONETARY]
for _, balance := range bChain {
if balance.Uuid != defaultBalance.Uuid &&
balance.Id != defaultBalance.Id && // extra caution
balance.MatchFilter(a.Balance, false) {
if balance.Value > 0 {
defaultBalance.Value += balance.Value
balance.Value = 0
}
}
}
return nil
}
// Structure to store actions according to weight
type Actions []*Action
func (apl Actions) Len() int {
return len(apl)
}
func (apl Actions) Swap(i, j int) {
apl[i], apl[j] = apl[j], apl[i]
}
// we need higher weights earlyer in the list
func (apl Actions) Less(j, i int) bool {
return apl[i].Weight < apl[j].Weight
}
func (apl Actions) Sort() {
sort.Sort(apl)
}