Files
cgrates/engine/action.go
ionutboangiu 1f7e0b33a2 Update *alter_sessions action
Will now support two extra parameters: address and codec. For *internal
connections, the birpc.Service object will be retrieved from rpcParamsMap
from utils. Supported codecs: <*gob|*json|*http_jsonrpc> (ingored for
*internal address).

Action does not bother with setting defaults anymore, lets the API
handle them.

Improve action comments.

Add unit tests for the action. UnregisterRpcParams' implementation was
required.

Updated *alter_sessions action tariffplan for radius coa integration test
to include address and codec.

Some birpc clients that are set up in integration tests were still registering
a SessionSv1 object. Updated them to register an AgentV1 object instead.
2024-02-27 16:28:48 +01:00

1278 lines
40 KiB
Go

/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"html/template"
"net/http"
"net/smtp"
"reflect"
"slices"
"sort"
"strconv"
"strings"
"time"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
"github.com/mitchellh/mapstructure"
)
// Action will be filled for each tariff plan with the bonus value for received calls minutes.
type Action struct {
Id string
ActionType string
ExtraParameters string
Filters []string
ExpirationString string // must stay as string because it can have relative values like 1month
Weight float64
Balance *BalanceFilter
balanceValue float64 // balance value after action execution, used with cdrlog
}
// Clone returns a clone of the action
func (a *Action) Clone() (cln *Action) {
if a == nil {
return
}
var fltrs []string
if a.Filters != nil {
fltrs = slices.Clone(a.Filters)
}
return &Action{
Id: a.Id,
ActionType: a.ActionType,
ExtraParameters: a.ExtraParameters,
Filters: fltrs,
ExpirationString: a.ExpirationString,
Weight: a.Weight,
Balance: a.Balance.Clone(),
}
}
type actionTypeFunc func(*Account, *Action, Actions, *FilterS, any) error
var actionFuncMap = make(map[string]actionTypeFunc)
func init() {
actionFuncMap[utils.MetaLog] = logAction
actionFuncMap[utils.MetaResetTriggers] = resetTriggersAction
actionFuncMap[utils.CDRLog] = cdrLogAction
actionFuncMap[utils.MetaSetRecurrent] = setRecurrentAction
actionFuncMap[utils.MetaUnsetRecurrent] = unsetRecurrentAction
actionFuncMap[utils.MetaAllowNegative] = allowNegativeAction
actionFuncMap[utils.MetaDenyNegative] = denyNegativeAction
actionFuncMap[utils.MetaResetAccount] = resetAccountAction
actionFuncMap[utils.MetaTopUpReset] = topupResetAction
actionFuncMap[utils.MetaTopUp] = topupAction
actionFuncMap[utils.MetaDebitReset] = debitResetAction
actionFuncMap[utils.MetaDebit] = debitAction
actionFuncMap[utils.MetaTransferBalance] = transferBalanceAction
actionFuncMap[utils.MetaResetCounters] = resetCountersAction
actionFuncMap[utils.MetaEnableAccount] = enableAccountAction
actionFuncMap[utils.MetaDisableAccount] = disableAccountAction
actionFuncMap[utils.MetaMailAsync] = mailAsync
actionFuncMap[utils.MetaSetDDestinations] = setddestinations
actionFuncMap[utils.MetaRemoveAccount] = removeAccountAction
actionFuncMap[utils.MetaRemoveBalance] = removeBalanceAction
actionFuncMap[utils.MetaSetBalance] = setBalanceAction
actionFuncMap[utils.MetaTransferMonetaryDefault] = transferMonetaryDefaultAction
actionFuncMap[utils.MetaCgrRpc] = cgrRPCAction
actionFuncMap[utils.MetaAlterSessions] = alterSessionsAction
actionFuncMap[utils.TopUpZeroNegative] = topupZeroNegativeAction
actionFuncMap[utils.SetExpiry] = setExpiryAction
actionFuncMap[utils.MetaPublishAccount] = publishAccount
actionFuncMap[utils.MetaRemoveSessionCosts] = removeSessionCosts
actionFuncMap[utils.MetaRemoveExpired] = removeExpired
actionFuncMap[utils.MetaCDRAccount] = resetAccountCDR
actionFuncMap[utils.MetaExport] = export
actionFuncMap[utils.MetaResetThreshold] = resetThreshold
actionFuncMap[utils.MetaResetStatQueue] = resetStatQueue
actionFuncMap[utils.MetaRemoteSetAccount] = remoteSetAccount
}
func getActionFunc(typ string) (f actionTypeFunc, exists bool) {
f, exists = actionFuncMap[typ]
return
}
func RegisterActionFunc(action string, f actionTypeFunc) {
actionFuncMap[action] = f
}
// transferBalanceAction transfers units between accounts' balances.
// It ensures both source and destination balances are of the same type and non-expired.
// Destination account and balance IDs are obtained from Action's ExtraParameters.
// ExtraParameters should be a JSON string containing keys 'DestAccountID' and 'DestBalanceID',
// which identify the destination account and balance for the transfer.
func transferBalanceAction(srcAcc *Account, act *Action, _ Actions, fltrS *FilterS, _ any) error {
if srcAcc == nil {
return errors.New("source account is nil")
}
if act.Balance.ID == nil {
return errors.New("source balance ID is missing")
}
if act.ExtraParameters == "" {
return errors.New("ExtraParameters used to identify the destination balance are missing")
}
if len(srcAcc.BalanceMap) == 0 {
return fmt.Errorf("account %s has no balances to transfer from", srcAcc.ID)
}
srcBalance := srcAcc.FindBalanceByID(*act.Balance.ID)
if srcBalance == nil || srcBalance.IsExpiredAt(time.Now()) {
return errors.New("source balance not found or expired")
}
transferUnits := act.Balance.GetValue()
if transferUnits == 0 {
return errors.New("balance value is missing or 0")
}
if srcBalance.ID != utils.MetaDefault && transferUnits > srcBalance.Value {
return utils.ErrInsufficientCredit
}
accDestInfo := struct {
DestinationAccountID string
DestinationBalanceID string
}{}
if err := json.Unmarshal([]byte(act.ExtraParameters), &accDestInfo); err != nil {
return err
}
// This guard is meant to lock the destination account as we are making changes to it. It was not needed
// for the source account due to it being locked from outside this function.
guardErr := guardian.Guardian.Guard(func() error {
destAcc, err := dm.GetAccount(accDestInfo.DestinationAccountID)
if err != nil {
return fmt.Errorf("retrieving destination account failed: %w", err)
}
destBalance := destAcc.FindBalanceByID(accDestInfo.DestinationBalanceID)
if destBalance == nil || destBalance.IsExpiredAt(time.Now()) {
return errors.New("destination balance not found or expired")
}
srcBalance.SubtractValue(transferUnits)
srcBalance.dirty = true
destBalance.AddValue(transferUnits)
destBalance.dirty = true
destAcc.InitCounters()
destAcc.ExecuteActionTriggers(act, fltrS)
if err := dm.SetAccount(destAcc); err != nil {
return fmt.Errorf("updating destination account failed: %w", err)
}
return nil
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.AccountPrefix+accDestInfo.DestinationAccountID)
if guardErr != nil {
return guardErr
}
// Execute action triggers for the source account. This account will be updated in the parent function.
srcAcc.InitCounters()
srcAcc.ExecuteActionTriggers(act, fltrS)
return nil
}
func logAction(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) (err error) {
switch {
case ub != nil:
body, _ := json.Marshal(ub)
utils.Logger.Info(fmt.Sprintf("LOG Account: %s", body))
case extraData != nil:
body, _ := json.Marshal(extraData)
utils.Logger.Info(fmt.Sprintf("LOG ExtraData: %s", body))
}
return
}
func cdrLogAction(acc *Account, a *Action, acs Actions, _ *FilterS, extraData any) (err error) {
if len(config.CgrConfig().SchedulerCfg().CDRsConns) == 0 {
return errors.New("No connection with CDR Server")
}
defaultTemplate := map[string]config.RSRParsers{
utils.ToR: config.NewRSRParsersMustCompile(utils.DynamicDataPrefix+utils.MetaAcnt+utils.NestingSep+utils.BalanceType, utils.InfieldSep),
utils.OriginHost: config.NewRSRParsersMustCompile("127.0.0.1", utils.InfieldSep),
utils.RequestType: config.NewRSRParsersMustCompile(utils.MetaNone, utils.InfieldSep),
utils.Tenant: config.NewRSRParsersMustCompile(utils.DynamicDataPrefix+utils.MetaAcnt+utils.NestingSep+utils.Tenant, utils.InfieldSep),
utils.AccountField: config.NewRSRParsersMustCompile(utils.DynamicDataPrefix+utils.MetaAcnt+utils.NestingSep+utils.AccountField, utils.InfieldSep),
utils.Subject: config.NewRSRParsersMustCompile(utils.DynamicDataPrefix+utils.MetaAcnt+utils.NestingSep+utils.AccountField, utils.InfieldSep),
utils.Cost: config.NewRSRParsersMustCompile(utils.DynamicDataPrefix+utils.MetaAct+utils.NestingSep+utils.ActionValue, utils.InfieldSep),
}
template := make(map[string]string)
// overwrite default template
if a.ExtraParameters != "" {
if err = json.Unmarshal([]byte(a.ExtraParameters), &template); err != nil {
return err
}
for field, rsr := range template {
if defaultTemplate[field], err = config.NewRSRParsers(rsr,
config.CgrConfig().GeneralCfg().RSRSep); err != nil {
return err
}
}
}
//In case that we have extra data we populate default templates
mapExtraData, _ := extraData.(map[string]any)
for key, val := range mapExtraData {
if defaultTemplate[key], err = config.NewRSRParsers(utils.IfaceAsString(val),
config.CgrConfig().GeneralCfg().RSRSep); err != nil {
return err
}
}
currentTime := time.Now()
// set stored cdr values
var cdrs []*CDR
for _, action := range acs {
if !slices.Contains(
[]string{utils.MetaDebit, utils.MetaDebitReset,
utils.MetaTopUp, utils.MetaTopUpReset,
utils.MetaSetBalance, utils.MetaRemoveBalance,
utils.MetaRemoveExpired, utils.MetaTransferBalance,
}, action.ActionType) || action.Balance == nil {
continue // Only log specific actions
}
cdr := &CDR{
RunID: action.ActionType,
Source: utils.CDRLog,
SetupTime: currentTime,
AnswerTime: currentTime,
OriginID: utils.GenUUID(),
ExtraFields: make(map[string]string),
PreRated: true,
Usage: time.Duration(1),
}
cdr.CGRID = utils.Sha1(cdr.OriginID, cdr.OriginHost)
cdrLogProvider := newCdrLogProvider(acc, action)
elem := reflect.ValueOf(cdr).Elem()
for key, rsrFlds := range defaultTemplate {
parsedValue, err := rsrFlds.ParseDataProvider(cdrLogProvider)
if err != nil {
return err
}
field := elem.FieldByName(key)
if field.IsValid() && field.CanSet() {
switch field.Kind() {
case reflect.Float64:
value, err := strconv.ParseFloat(parsedValue, 64)
if err != nil {
continue
}
field.SetFloat(value)
case reflect.String:
field.SetString(parsedValue)
case reflect.Int64:
value, err := strconv.ParseInt(parsedValue, 10, 64)
if err != nil {
continue
}
field.SetInt(value)
}
} else { // invalid fields go in extraFields of CDR
cdr.ExtraFields[key] = parsedValue
}
}
// Function to process balances and append CDR if conditions are met.
processBalances := func(checkFunc func(*Balance) bool) error {
if acc == nil {
return fmt.Errorf("nil account for action %s", utils.ToJSON(action))
}
balanceChain, exists := acc.BalanceMap[action.Balance.GetType()]
if !exists {
return utils.ErrNotFound
}
found := false
for _, balance := range balanceChain {
if checkFunc(balance) {
// Create a new CDR instance for each balance that meets the condition.
newCDR := *cdr // Copy CDR's values to a new CDR instance.
newCDR.Cost = balance.Value
newCDR.OriginID = utils.GenUUID() // OriginID must be unique for every CDR.
newCDR.CGRID = utils.Sha1(newCDR.OriginID, newCDR.OriginHost)
newCDR.ExtraFields[utils.BalanceID] = balance.ID
cdrs = append(cdrs, &newCDR) // Append the address of the new instance.
found = true
}
}
if !found {
return utils.ErrNotFound
}
return nil
}
// If the action is of type *remove_balance or *remove_expired, for each matched balance,
// assign the balance values to the CDR cost and append to the list of CDRs.
switch action.ActionType {
case utils.MetaRemoveBalance:
if err = processBalances(func(b *Balance) bool {
return b.MatchFilter(action.Balance, false, false)
}); err != nil {
return err
}
continue
case utils.MetaRemoveExpired:
if err = processBalances(func(b *Balance) bool {
return b.IsExpiredAt(currentTime)
}); err != nil {
return err
}
continue
case utils.MetaTransferBalance:
cdr.Cost = action.Balance.GetValue()
cdr.Account = utils.SplitConcatenatedKey(acc.ID)[1] // Extract ID from TenantID.
accDestInfo := struct {
DestinationAccountID string
DestinationBalanceID string
}{}
if err := json.Unmarshal([]byte(action.ExtraParameters), &accDestInfo); err != nil {
return err
}
cdr.Destination = utils.SplitConcatenatedKey(accDestInfo.DestinationAccountID)[1] // Extract ID from TenantID.
cdr.ExtraFields[utils.SourceBalanceID] = *action.Balance.ID
cdr.ExtraFields[utils.DestinationBalanceID] = accDestInfo.DestinationBalanceID
}
cdrs = append(cdrs, cdr)
}
events := make([]*utils.CGREvent, 0, len(cdrs))
for _, cdr := range cdrs {
events = append(events, cdr.AsCGREvent())
}
var reply string
if err := connMgr.Call(context.TODO(), config.CgrConfig().SchedulerCfg().CDRsConns,
utils.CDRsV1ProcessEvents,
&ArgV1ProcessEvents{
Flags: []string{utils.ConcatenatedKey(utils.MetaChargers, "false")},
CGREvents: events,
}, &reply); err != nil {
return err
}
b, _ := json.Marshal(cdrs)
a.ExpirationString = string(b) // testing purpose only
return nil
}
func resetTriggersAction(ub *Account, a *Action, acs Actions, fltrS *FilterS, extraData any) (err error) {
if ub == nil {
return errors.New("nil account")
}
ub.ResetActionTriggers(a, fltrS)
return
}
func setRecurrentAction(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) (err error) {
if ub == nil {
return errors.New("nil account")
}
ub.SetRecurrent(a, true)
return
}
func unsetRecurrentAction(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) (err error) {
if ub == nil {
return errors.New("nil account")
}
ub.SetRecurrent(a, false)
return
}
func allowNegativeAction(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) (err error) {
if ub == nil {
return errors.New("nil account")
}
ub.AllowNegative = true
return
}
func denyNegativeAction(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) (err error) {
if ub == nil {
return errors.New("nil account")
}
ub.AllowNegative = false
return
}
func resetAccountAction(ub *Account, a *Action, acs Actions, fltrS *FilterS, extraData any) (err error) {
if ub == nil {
return errors.New("nil account")
}
return genericReset(ub, fltrS)
}
func topupResetAction(ub *Account, a *Action, acs Actions, fltrS *FilterS, extraData any) (err error) {
if ub == nil {
return errors.New("nil account")
}
if ub.BalanceMap == nil { // Init the map since otherwise will get error if nil
ub.BalanceMap = make(map[string]Balances)
}
c := a.Clone()
genericMakeNegative(c)
err = genericDebit(ub, c, true, fltrS)
a.balanceValue = c.balanceValue
return
}
func topupAction(ub *Account, a *Action, acs Actions, fltrS *FilterS, extraData any) (err error) {
if ub == nil {
return errors.New("nil account")
}
c := a.Clone()
genericMakeNegative(c)
err = genericDebit(ub, c, false, fltrS)
a.balanceValue = c.balanceValue
return
}
func debitResetAction(ub *Account, a *Action, acs Actions, fltrS *FilterS, extraData any) (err error) {
if ub == nil {
return errors.New("nil account")
}
if ub.BalanceMap == nil { // Init the map since otherwise will get error if nil
ub.BalanceMap = make(map[string]Balances)
}
return genericDebit(ub, a, true, fltrS)
}
func debitAction(ub *Account, a *Action, acs Actions, fltrS *FilterS, extraData any) (err error) {
if ub == nil {
return errors.New("nil account")
}
err = genericDebit(ub, a, false, fltrS)
return
}
func resetCountersAction(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) (err error) {
if ub == nil {
return errors.New("nil account")
}
if ub.UnitCounters != nil {
ub.UnitCounters.resetCounters(a)
}
return
}
func genericMakeNegative(a *Action) {
if a.Balance != nil && a.Balance.GetValue() > 0 { // only apply if not allready negative
a.Balance.SetValue(-a.Balance.GetValue())
}
}
func genericDebit(ub *Account, a *Action, reset bool, fltrS *FilterS) (err error) {
if ub == nil {
return errors.New("nil account")
}
if ub.BalanceMap == nil {
ub.BalanceMap = make(map[string]Balances)
}
return ub.debitBalanceAction(a, reset, false, fltrS)
}
func enableAccountAction(acc *Account, a *Action, acs Actions, _ *FilterS, extraData any) (err error) {
if acc == nil {
return errors.New("nil account")
}
acc.Disabled = false
return
}
func disableAccountAction(acc *Account, a *Action, acs Actions, _ *FilterS, extraData any) (err error) {
if acc == nil {
return errors.New("nil account")
}
acc.Disabled = true
return
}
/*func enableDisableBalanceAction(ub *Account, sq *CDRStatsQueueTriggered, a *Action, acs Actions) (err error) {
if ub == nil {
return errors.New("nil account")
}
ub.enableDisableBalanceAction(a)
return
}*/
func genericReset(ub *Account, fltrS *FilterS) error {
for k := range ub.BalanceMap {
ub.BalanceMap[k] = Balances{&Balance{Value: 0}}
}
ub.InitCounters()
ub.ResetActionTriggers(nil, fltrS)
return nil
}
// Mails the balance hitting the threshold towards predefined list of addresses
func mailAsync(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) error {
cgrCfg := config.CgrConfig()
params := strings.Split(a.ExtraParameters, string(utils.CSVSep))
if len(params) == 0 {
return errors.New("Unconfigured parameters for mail action")
}
toAddrs := strings.Split(params[0], string(utils.FallbackSep))
toAddrStr := ""
for idx, addr := range toAddrs {
if idx != 0 {
toAddrStr += ", "
}
toAddrStr += addr
}
var message []byte
if ub != nil {
balJsn, err := json.Marshal(ub)
if err != nil {
return err
}
message = []byte(fmt.Sprintf("To: %s\r\nSubject: [CGR Notification] Threshold hit on Balance: %s\r\n\r\nTime: \r\n\t%s\r\n\r\nBalance:\r\n\t%s\r\n\r\nYours faithfully,\r\nCGR Balance Monitor\r\n", toAddrStr, ub.ID, time.Now(), balJsn))
}
var auth smtp.Auth
if len(cgrCfg.MailerCfg().MailerAuthUser) > 0 || len(cgrCfg.MailerCfg().MailerAuthPass) > 0 { //use auth if user/pass not empty in config
auth = smtp.PlainAuth("", cgrCfg.MailerCfg().MailerAuthUser, cgrCfg.MailerCfg().MailerAuthPass, strings.Split(cgrCfg.MailerCfg().MailerServer, ":")[0]) // We only need host part, so ignore port
}
go func() {
for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort
if err := smtp.SendMail(cgrCfg.MailerCfg().MailerServer, auth, cgrCfg.MailerCfg().MailerFromAddr, toAddrs, message); err == nil {
break
} else if i == 4 {
if ub != nil {
utils.Logger.Warning(fmt.Sprintf("<Triggers> WARNING: Failed emailing, params: [%s], error: [%s], BalanceId: %s", a.ExtraParameters, err.Error(), ub.ID))
}
break
}
time.Sleep(time.Duration(i) * time.Minute)
}
}()
return nil
}
func setddestinations(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) (err error) {
var ddcDestID string
for _, bchain := range ub.BalanceMap {
for _, b := range bchain {
for destID := range b.DestinationIDs {
if strings.HasPrefix(destID, utils.MetaDDC) {
ddcDestID = destID
break
}
}
if ddcDestID != "" {
break
}
}
if ddcDestID != "" {
break
}
}
if ddcDestID != "" {
destinations := utils.NewStringSet(nil)
for _, statID := range strings.Split(a.ExtraParameters, utils.InfieldSep) {
if statID == utils.EmptyString {
continue
}
var sts StatQueue
if err = connMgr.Call(context.TODO(), config.CgrConfig().RalsCfg().StatSConns,
utils.StatSv1GetStatQueue,
&utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{
Tenant: config.CgrConfig().GeneralCfg().DefaultTenant,
ID: statID,
},
}, &sts); err != nil {
return
}
ddcIface, has := sts.SQMetrics[utils.MetaDDC]
if !has {
continue
}
ddcMetric := ddcIface.(*StatDDC)
// make slice from prefixes
// Review here prefixes
for p := range ddcMetric.FieldValues {
destinations.Add(p)
}
}
newDest := &Destination{Id: ddcDestID, Prefixes: destinations.AsSlice()}
oldDest, err := dm.GetDestination(ddcDestID, true, true, utils.NonTransactional)
if err != nil {
return err
}
// update destid in storage
if err = dm.SetDestination(newDest, utils.NonTransactional); err != nil {
return err
}
if err = dm.CacheDataFromDB(utils.DestinationPrefix, []string{ddcDestID}, true); err != nil {
return err
}
if err == nil && oldDest != nil {
if err = dm.UpdateReverseDestination(oldDest, newDest, utils.NonTransactional); err != nil {
return err
}
}
} else {
return utils.ErrNotFound
}
return nil
}
func removeAccountAction(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) error {
var accID string
if ub != nil {
accID = ub.ID
} else {
accountInfo := struct {
Tenant string
Account string
}{}
if a.ExtraParameters != "" {
if err := json.Unmarshal([]byte(a.ExtraParameters), &accountInfo); err != nil {
return err
}
}
accID = utils.ConcatenatedKey(accountInfo.Tenant, accountInfo.Account)
}
if accID == "" {
return utils.ErrInvalidKey
}
if err := dm.RemoveAccount(accID); err != nil {
utils.Logger.Err(fmt.Sprintf("Could not remove account Id: %s: %v", accID, err))
return err
}
return guardian.Guardian.Guard(func() error {
acntAPids, err := dm.GetAccountActionPlans(accID, true, true, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
utils.Logger.Err(fmt.Sprintf("Could not get action plans: %s: %v", accID, err))
return err
}
for _, apID := range acntAPids {
ap, err := dm.GetActionPlan(apID, true, true, utils.NonTransactional)
if err != nil {
utils.Logger.Err(fmt.Sprintf("Could not retrieve action plan: %s: %v", apID, err))
return err
}
delete(ap.AccountIDs, accID)
if err := dm.SetActionPlan(apID, ap, true, utils.NonTransactional); err != nil {
utils.Logger.Err(fmt.Sprintf("Could not save action plan: %s: %v", apID, err))
return err
}
}
if err = dm.CacheDataFromDB(utils.ActionPlanPrefix, acntAPids, true); err != nil {
return err
}
if err = dm.RemAccountActionPlans(accID, nil); err != nil {
return err
}
if err = dm.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true); err != nil && err.Error() != utils.ErrNotFound.Error() {
return err
}
return nil
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.ActionPlanPrefix)
}
func removeBalanceAction(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) error {
if ub == nil {
return fmt.Errorf("nil account for %s action", utils.ToJSON(a))
}
if _, exists := ub.BalanceMap[a.Balance.GetType()]; !exists {
return utils.ErrNotFound
}
bChain := ub.BalanceMap[a.Balance.GetType()]
found := false
for i := 0; i < len(bChain); i++ {
if bChain[i].MatchFilter(a.Balance, false, false) {
// delete without preserving order
bChain[i] = bChain[len(bChain)-1]
bChain = bChain[:len(bChain)-1]
i--
found = true
}
}
ub.BalanceMap[a.Balance.GetType()] = bChain
if !found {
return utils.ErrNotFound
}
return nil
}
func setBalanceAction(ub *Account, a *Action, acs Actions, fltrS *FilterS, extraData any) error {
if ub == nil {
return fmt.Errorf("nil account for %s action", utils.ToJSON(a))
}
return ub.setBalanceAction(a, fltrS)
}
func transferMonetaryDefaultAction(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) error {
if ub == nil {
utils.Logger.Err("*transfer_monetary_default called without account")
return utils.ErrAccountNotFound
}
if _, exists := ub.BalanceMap[utils.MetaMonetary]; !exists {
return utils.ErrNotFound
}
defaultBalance := ub.GetDefaultMoneyBalance()
bChain := ub.BalanceMap[utils.MetaMonetary]
for _, balance := range bChain {
if balance.Uuid != defaultBalance.Uuid &&
balance.ID != defaultBalance.ID && // extra caution
balance.MatchFilter(a.Balance, false, false) {
if balance.Value > 0 {
defaultBalance.Value += balance.Value
balance.Value = 0
}
}
}
return nil
}
// RPCRequest used by rpc action
type RPCRequest struct {
Address string
Transport string
Method string
Attempts int
Async bool
Params map[string]any
}
/*
<< .Object.Property >>
Property can be a attribute or a method both used without ()
Please also note the initial dot .
Currently there are following objects that can be used:
Account - the account that this action is called on
Action - the action with all it's attributs
Actions - the list of actions in the current action set
Sq - CDRStatsQueueTriggered object
We can actually use everythiong that go templates offer. You can read more here: https://golang.org/pkg/text/template/
*/
func cgrRPCAction(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) (err error) {
// parse template
tmpl := template.New("extra_params")
tmpl.Delims("<<", ">>")
if tmpl, err = tmpl.Parse(a.ExtraParameters); err != nil {
utils.Logger.Err(fmt.Sprintf("error parsing *cgr_rpc template: %s", err.Error()))
return
}
var buf bytes.Buffer
if err = tmpl.Execute(&buf, struct {
Account *Account
Action *Action
Actions Actions
ExtraData any
}{ub, a, acs, extraData}); err != nil {
utils.Logger.Err(fmt.Sprintf("error executing *cgr_rpc template %s:", err.Error()))
return
}
var req RPCRequest
if err = json.Unmarshal(buf.Bytes(), &req); err != nil {
return
}
var params *utils.RpcParams
if params, err = utils.GetRpcParams(req.Method); err != nil {
return
}
var client birpc.ClientConnector
if req.Address == utils.MetaInternal {
client = params.Object.(birpc.ClientConnector)
} else if client, err = rpcclient.NewRPCClient(context.TODO(), utils.TCP, req.Address, false, "", "", "",
req.Attempts, 0,
config.CgrConfig().GeneralCfg().MaxReconnectInterval,
utils.FibDuration,
config.CgrConfig().GeneralCfg().ConnectTimeout,
config.CgrConfig().GeneralCfg().ReplyTimeout,
req.Transport, nil, false, nil); err != nil {
return
}
// Decode's output parameter requires a pointer.
if reflect.TypeOf(params.InParam).Kind() == reflect.Pointer {
err = mapstructure.Decode(req.Params, params.InParam)
} else {
err = mapstructure.Decode(req.Params, &params.InParam)
}
if err != nil {
utils.Logger.Info("<*cgr_rpc> err: " + err.Error())
return
}
if params.InParam == nil {
utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> nil params err: req.Params: %+v params: %+v", req.Params, params))
return utils.ErrParserError
}
utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> calling: %s with: %s and result %v", req.Method, utils.ToJSON(params.InParam), params.OutParam))
if !req.Async {
err = client.Call(context.TODO(), req.Method, params.InParam, params.OutParam)
utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %s err: %v", utils.ToJSON(params.OutParam), err))
return
}
go func() {
err := client.Call(context.TODO(), req.Method, params.InParam, params.OutParam)
utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %s err: %v", utils.ToJSON(params.OutParam), err))
}()
return
}
// alterSessionsAction processes the `ExtraParameters` field from the action to construct a request
// for the `SessionSv1.AlterSessions` API call.
//
// The ExtraParameters field format is expected as follows:
// - address: string, specifies the server address in the format host:port or *internal.
// - codec: string, specifies the encoding used for communication <*json|*gob|*http_jsonrpc>.
// - tenant: string
// - limit: integer, specifying the maximum number of sessions to alter.
// - filters: strings separated by "&".
// - APIOpts: set of key-value pairs (separated by "&").
// - Event: set of key-value pairs (separated by "&").
//
// Parameters are separated by ";" and must be provided in the specified order.
func alterSessionsAction(_ *Account, act *Action, _ Actions, _ *FilterS, _ any) (err error) {
// Parse action parameters based on the predefined format.
params := strings.Split(act.ExtraParameters, ";")
if len(params) != 7 {
return errors.New("invalid number of parameters; expected 7")
}
// Establish a client connection.
address := params[0]
codec := params[1]
var client birpc.ClientConnector
switch address {
case utils.MetaInternal:
// For internal connections, use the already registered SessionSv1 birpc.Service object.
var rpcParams *utils.RpcParams
rpcParams, err = utils.GetRpcParams(utils.SessionSv1AlterSessions)
if err != nil {
return fmt.Errorf("retrieving service for *internal calls failed: %w", err)
}
client = rpcParams.Object.(birpc.ClientConnector)
default:
// For external connections, create a new RPCClient.
client, err = rpcclient.NewRPCClient(context.TODO(), utils.TCP, address,
false, "", "", "", 1, 0,
config.CgrConfig().GeneralCfg().MaxReconnectInterval,
utils.FibDuration,
config.CgrConfig().GeneralCfg().ConnectTimeout,
config.CgrConfig().GeneralCfg().ReplyTimeout,
codec, nil, false, nil)
if err != nil {
return fmt.Errorf("failed to init RPCClient: %w", err)
}
}
// If conversion fails, limit will default to 0.
limit, _ := strconv.Atoi(params[4])
// Prepare request arguments based on provided parameters.
attr := utils.SessionFilterWithEvent{
SessionFilter: &utils.SessionFilter{
Limit: &limit,
Tenant: params[2],
Filters: strings.Split(params[3], "&"),
APIOpts: make(map[string]any),
},
Event: make(map[string]any),
}
// Helper function to parse key-value pairs for API options and event data.
parseKVParams := func(paramStr string, targetMap map[string]any) error {
for _, tuple := range strings.Split(paramStr, "&") {
// Use strings.Cut to split 'tuple' into key-value pairs at the first occurrence of ':'.
// This ensures that additional ':' characters within the value do not affect parsing.
key, value, found := strings.Cut(tuple, ":")
if !found {
return fmt.Errorf("invalid key-value pair: %s", tuple)
}
targetMap[key] = value
}
return nil
}
if err := parseKVParams(params[5], attr.APIOpts); err != nil {
return err
}
if err := parseKVParams(params[6], attr.Event); err != nil {
return err
}
var reply string
return client.Call(context.Background(), utils.SessionSv1AlterSessions, attr, &reply)
}
func topupZeroNegativeAction(ub *Account, a *Action, acs Actions, fltrS *FilterS, extraData any) error {
if ub == nil {
return errors.New("nil account")
}
if ub.BalanceMap == nil {
ub.BalanceMap = make(map[string]Balances)
}
return ub.debitBalanceAction(a, false, true, fltrS)
}
func setExpiryAction(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) error {
if ub == nil {
return errors.New("nil account")
}
balanceType := a.Balance.GetType()
for _, b := range ub.BalanceMap[balanceType] {
if b.MatchFilter(a.Balance, false, true) {
b.ExpirationDate = a.Balance.GetExpirationDate()
}
}
return nil
}
// publishAccount will publish the account as well as each balance received to ThresholdS
func publishAccount(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) error {
if ub == nil {
return errors.New("nil account")
}
initBal := make(map[string]float64)
for _, bals := range ub.BalanceMap {
for _, bal := range bals {
initBal[bal.Uuid] = bal.Value
}
}
ub.Publish(initBal)
return nil
}
// Actions used to store actions according to weight
type Actions []*Action
func (apl Actions) Len() int {
return len(apl)
}
func (apl Actions) Swap(i, j int) {
apl[i], apl[j] = apl[j], apl[i]
}
// we need higher weights earlyer in the list
func (apl Actions) Less(j, i int) bool {
return apl[i].Weight < apl[j].Weight
}
// Sort used to implement sort interface
func (apl Actions) Sort() {
sort.Sort(apl)
}
// Clone returns a clone from object
func (apl Actions) Clone() (any, error) {
if apl == nil {
return nil, nil
}
cln := make(Actions, len(apl))
for i, action := range apl {
cln[i] = action.Clone()
}
return cln, nil
}
// newCdrLogProvider constructs a DataProvider
func newCdrLogProvider(acnt *Account, action *Action) (dP utils.DataProvider) {
dP = &cdrLogProvider{acnt: acnt, action: action, cache: utils.MapStorage{}}
return
}
// cdrLogProvider implements utils.DataProvider so we can pass it to filters
type cdrLogProvider struct {
acnt *Account
action *Action
cache utils.MapStorage
}
// String is part of utils.DataProvider interface
// when called, it will display the already parsed values out of cache
func (cdrP *cdrLogProvider) String() string {
return utils.ToJSON(cdrP)
}
// FieldAsInterface is part of utils.DataProvider interface
func (cdrP *cdrLogProvider) FieldAsInterface(fldPath []string) (data any, err error) {
if data, err = cdrP.cache.FieldAsInterface(fldPath); err == nil ||
err != utils.ErrNotFound { // item found in cache
return
}
err = nil // cancel previous err
if len(fldPath) == 2 {
switch fldPath[0] {
case utils.MetaAcnt:
switch fldPath[1] {
case utils.AccountID:
data = cdrP.acnt.ID
case utils.Tenant:
tntAcnt := new(utils.TenantAccount) // Init with empty values
if cdrP.acnt != nil {
if tntAcnt, err = utils.NewTAFromAccountKey(cdrP.acnt.ID); err != nil {
return
}
}
data = tntAcnt.Tenant
case utils.AccountField:
tntAcnt := new(utils.TenantAccount) // Init with empty values
if cdrP.acnt != nil {
if tntAcnt, err = utils.NewTAFromAccountKey(cdrP.acnt.ID); err != nil {
return
}
}
data = tntAcnt.Account
case utils.BalanceType:
data = cdrP.action.Balance.GetType()
case utils.BalanceUUID:
data = cdrP.action.Balance.CreateBalance().Uuid
case utils.BalanceID:
data = cdrP.action.Balance.CreateBalance().ID
case utils.BalanceValue:
data = strconv.FormatFloat(cdrP.action.balanceValue, 'f', -1, 64)
case utils.DestinationIDs:
data = cdrP.action.Balance.CreateBalance().DestinationIDs.String()
case utils.ExtraParameters:
data = cdrP.action.ExtraParameters
case utils.RatingSubject:
data = cdrP.action.Balance.CreateBalance().RatingSubject
case utils.Category:
data = cdrP.action.Balance.Categories.String()
case utils.SharedGroups:
data = cdrP.action.Balance.SharedGroups.String()
}
case utils.MetaAct:
switch fldPath[1] {
case utils.ActionID:
data = cdrP.action.Id
case utils.ActionType:
data = cdrP.action.ActionType
case utils.ActionValue:
data = strconv.FormatFloat(cdrP.action.Balance.CreateBalance().GetValue(), 'f', -1, 64)
}
}
} else {
data = fldPath[0]
}
cdrP.cache.Set(fldPath, data)
return
}
// FieldAsString is part of utils.DataProvider interface
func (cdrP *cdrLogProvider) FieldAsString(fldPath []string) (data string, err error) {
var valIface any
valIface, err = cdrP.FieldAsInterface(fldPath)
if err != nil {
return
}
return utils.IfaceAsString(valIface), nil
}
func removeSessionCosts(_ *Account, action *Action, _ Actions, _ *FilterS, _ any) error { // FiltersID;inlineFilter
tenant := config.CgrConfig().GeneralCfg().DefaultTenant
smcFilter := new(utils.SMCostFilter)
for _, fltrID := range strings.Split(action.ExtraParameters, utils.InfieldSep) {
if len(fltrID) == 0 {
continue
}
fltr, err := dm.GetFilter(tenant, fltrID, true, true, utils.NonTransactional)
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Error: %s for filter: %s in action: <%s>",
utils.Actions, err.Error(), fltrID, utils.MetaRemoveSessionCosts))
continue
}
for _, rule := range fltr.Rules {
smcFilter, err = utils.AppendToSMCostFilter(smcFilter, rule.Type, rule.Element, rule.Values, config.CgrConfig().GeneralCfg().DefaultTimezone)
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> %s in action: <%s>", utils.Actions, err.Error(), utils.MetaRemoveSessionCosts))
}
}
}
return cdrStorage.RemoveSMCosts(smcFilter)
}
func removeExpired(acc *Account, action *Action, _ Actions, _ *FilterS, extraData any) error {
if acc == nil {
return fmt.Errorf("nil account for %s action", utils.ToJSON(action))
}
bChain, exists := acc.BalanceMap[action.Balance.GetType()]
if !exists {
return utils.ErrNotFound
}
found := false
for i := 0; i < len(bChain); i++ {
if bChain[i].IsExpiredAt(time.Now()) {
// delete without preserving order
bChain[i] = bChain[len(bChain)-1]
bChain = bChain[:len(bChain)-1]
i--
found = true
}
}
acc.BalanceMap[action.Balance.GetType()] = bChain
if !found {
return utils.ErrNotFound
}
return nil
}
// resetAccountCDR resets the account out of values from CDR
func resetAccountCDR(ub *Account, action *Action, acts Actions, fltrS *FilterS, _ any) error {
if ub == nil {
return errors.New("nil account")
}
if cdrStorage == nil {
return fmt.Errorf("nil cdrStorage for %s action", utils.ToJSON(action))
}
account := ub.GetID()
filter := &utils.CDRsFilter{
Accounts: []string{account},
NotCosts: []float64{-1},
OrderBy: fmt.Sprintf("%s%sdesc", utils.OrderID, utils.InfieldSep),
Paginator: utils.Paginator{Limit: utils.IntPointer(1)},
}
cdrs, _, err := cdrStorage.GetCDRs(filter, false)
if err != nil {
return err
}
cd := cdrs[0].CostDetails
if cd == nil {
return errors.New("nil CostDetails")
}
acs := cd.AccountSummary
if acs == nil {
return errors.New("nil AccountSummary")
}
for _, bsum := range acs.BalanceSummaries {
if bsum == nil {
continue
}
if err := ub.setBalanceAction(&Action{
Balance: &BalanceFilter{
Uuid: &bsum.UUID,
ID: &bsum.ID,
Type: &bsum.Type,
Value: &utils.ValueFormula{Static: bsum.Value},
Disabled: &bsum.Disabled,
},
}, fltrS); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Error %s setting balance %s for account: %s", utils.Actions, err, bsum.UUID, account))
}
}
return nil
}
func export(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) (err error) {
var cgrEv *utils.CGREvent
switch {
case ub != nil:
cgrEv = &utils.CGREvent{
Tenant: utils.NewTenantID(ub.ID).Tenant,
ID: utils.GenUUID(),
Event: map[string]any{
utils.AccountField: ub.ID,
utils.EventType: utils.AccountUpdate,
utils.EventSource: utils.AccountService,
utils.AllowNegative: ub.AllowNegative,
utils.Disabled: ub.Disabled,
utils.BalanceMap: ub.BalanceMap,
utils.UnitCounters: ub.UnitCounters,
utils.ActionTriggers: ub.ActionTriggers,
utils.UpdateTime: ub.UpdateTime,
},
APIOpts: map[string]any{
utils.MetaEventType: utils.AccountUpdate,
},
}
case extraData != nil:
ev, canCast := extraData.(*utils.CGREvent)
if !canCast {
return
}
cgrEv = ev // only export CGREvents
default:
return // nothing to post
}
args := &CGREventWithEeIDs{
EeIDs: strings.Split(a.ExtraParameters, utils.InfieldSep),
CGREvent: cgrEv,
}
var rply map[string]map[string]any
return connMgr.Call(context.TODO(), config.CgrConfig().ApierCfg().EEsConns,
utils.EeSv1ProcessEvent, args, &rply)
}
func resetThreshold(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) (err error) {
args := &utils.TenantIDWithAPIOpts{
TenantID: utils.NewTenantID(a.ExtraParameters),
}
var rply string
return connMgr.Call(context.TODO(), config.CgrConfig().SchedulerCfg().ThreshSConns,
utils.ThresholdSv1ResetThreshold, args, &rply)
}
func resetStatQueue(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) (err error) {
args := &utils.TenantIDWithAPIOpts{
TenantID: utils.NewTenantID(a.ExtraParameters),
}
var rply string
return connMgr.Call(context.TODO(), config.CgrConfig().SchedulerCfg().StatSConns,
utils.StatSv1ResetStatQueue, args, &rply)
}
func remoteSetAccount(ub *Account, a *Action, acs Actions, _ *FilterS, extraData any) (err error) {
client := &http.Client{Transport: httpPstrTransport}
var resp *http.Response
req := new(bytes.Buffer)
if err = json.NewEncoder(req).Encode(ub); err != nil {
return
}
if resp, err = client.Post(a.ExtraParameters, "application/json", req); err != nil {
return
}
acc := new(Account)
err = json.NewDecoder(resp.Body).Decode(acc)
if err != nil {
return
}
if len(acc.BalanceMap) != 0 {
*ub = *acc
}
return
}