Update *cdrlog action to support *remove_expired actions

Will also be able to process more than one CDR per
action.
This commit is contained in:
ionutboangiu
2024-02-16 12:52:11 -05:00
committed by Dan Christian Bogos
parent 267e6109e1
commit 41976e5721
3 changed files with 159 additions and 59 deletions

View File

@@ -224,17 +224,18 @@ func cdrLogAction(acc *Account, a *Action, acs Actions, _ *FilterS, extraData an
utils.Tenant: config.NewRSRParsersMustCompile(utils.DynamicDataPrefix+utils.MetaAcnt+utils.NestingSep+utils.Tenant, utils.InfieldSep),
utils.AccountField: config.NewRSRParsersMustCompile(utils.DynamicDataPrefix+utils.MetaAcnt+utils.NestingSep+utils.AccountField, utils.InfieldSep),
utils.Subject: config.NewRSRParsersMustCompile(utils.DynamicDataPrefix+utils.MetaAcnt+utils.NestingSep+utils.AccountField, utils.InfieldSep),
utils.Cost: config.NewRSRParsersMustCompile(utils.DynamicDataPrefix+utils.MetaAct+utils.NestingSep+utils.ActionValue, utils.InfieldSep),
}
template := make(map[string]string)
// overwrite default template
if a.ExtraParameters != "" {
if err = json.Unmarshal([]byte(a.ExtraParameters), &template); err != nil {
return
return err
}
for field, rsr := range template {
if defaultTemplate[field], err = config.NewRSRParsers(rsr,
config.CgrConfig().GeneralCfg().RSRSep); err != nil {
return
return err
}
}
}
@@ -243,9 +244,10 @@ func cdrLogAction(acc *Account, a *Action, acs Actions, _ *FilterS, extraData an
for key, val := range mapExtraData {
if defaultTemplate[key], err = config.NewRSRParsers(utils.IfaceAsString(val),
config.CgrConfig().GeneralCfg().RSRSep); err != nil {
return
return err
}
}
currentTime := time.Now()
// set stored cdr values
var cdrs []*CDR
@@ -254,6 +256,7 @@ func cdrLogAction(acc *Account, a *Action, acs Actions, _ *FilterS, extraData an
[]string{utils.MetaDebit, utils.MetaDebitReset,
utils.MetaTopUp, utils.MetaTopUpReset,
utils.MetaSetBalance, utils.MetaRemoveBalance,
utils.MetaRemoveExpired,
}, action.ActionType) || action.Balance == nil {
continue // Only log specific actions
}
@@ -261,8 +264,8 @@ func cdrLogAction(acc *Account, a *Action, acs Actions, _ *FilterS, extraData an
cdr := &CDR{
RunID: action.ActionType,
Source: utils.CDRLog,
SetupTime: time.Now(),
AnswerTime: time.Now(),
SetupTime: currentTime,
AnswerTime: currentTime,
OriginID: utils.GenUUID(),
ExtraFields: make(map[string]string),
PreRated: true,
@@ -270,32 +273,6 @@ func cdrLogAction(acc *Account, a *Action, acs Actions, _ *FilterS, extraData an
}
cdr.CGRID = utils.Sha1(cdr.OriginID, cdr.OriginHost)
// If the action is of type *remove_balance, retrieve the balance value from the account
// and assign it to the CDR's Cost field.
if action.ActionType == utils.MetaRemoveBalance {
if acc == nil {
return fmt.Errorf("nil account for action %s", utils.ToJSON(action))
}
balanceChain, exists := acc.BalanceMap[action.Balance.GetType()]
if !exists {
return utils.ErrNotFound
}
found := false
for _, balance := range balanceChain {
if balance.MatchFilter(action.Balance, false, false) {
cdr.Cost = balance.Value
found = true
break
}
}
if !found {
return utils.ErrNotFound
}
} else {
// Otherwise, update the template to retrieve it from Action's BalanceValue.
defaultTemplate[utils.Cost] = config.NewRSRParsersMustCompile(utils.DynamicDataPrefix+utils.MetaAct+utils.NestingSep+utils.ActionValue, utils.InfieldSep)
}
cdrLogProvider := newCdrLogProvider(acc, action)
elem := reflect.ValueOf(cdr).Elem()
for key, rsrFlds := range defaultTemplate {
@@ -325,21 +302,73 @@ func cdrLogAction(acc *Account, a *Action, acs Actions, _ *FilterS, extraData an
cdr.ExtraFields[key] = parsedValue
}
}
cdrs = append(cdrs, cdr)
var rply string
// After compute the CDR send it to CDR Server to be processed
if err := connMgr.Call(context.TODO(), config.CgrConfig().SchedulerCfg().CDRsConns,
utils.CDRsV1ProcessEvent,
&ArgV1ProcessEvent{
Flags: []string{utils.ConcatenatedKey(utils.MetaChargers, "false")}, // do not try to get the chargers for cdrlog
CGREvent: *cdr.AsCGREvent(),
}, &rply); err != nil {
return err
// Function to process balances and append CDR if conditions are met.
processBalances := func(checkFunc func(*Balance) bool) error {
if acc == nil {
return fmt.Errorf("nil account for action %s", utils.ToJSON(action))
}
balanceChain, exists := acc.BalanceMap[action.Balance.GetType()]
if !exists {
return utils.ErrNotFound
}
found := false
for _, balance := range balanceChain {
if checkFunc(balance) {
// Create a new CDR instance for each balance that meets the condition.
newCDR := *cdr // Copy CDR's values to a new CDR instance.
newCDR.Cost = balance.Value
newCDR.OriginID = utils.GenUUID() // OriginID must be unique for every CDR.
newCDR.CGRID = utils.Sha1(newCDR.OriginID, newCDR.OriginHost)
newCDR.ExtraFields[utils.BalanceID] = balance.ID
cdrs = append(cdrs, &newCDR) // Append the address of the new instance.
found = true
}
}
if !found {
return utils.ErrNotFound
}
return nil
}
// If the action is of type *remove_balance or *remove_expired, for each matched balance,
// assign the balance values to the CDR cost and append to the list of CDRs.
switch action.ActionType {
case utils.MetaRemoveBalance:
if err = processBalances(func(b *Balance) bool {
return b.MatchFilter(action.Balance, false, false)
}); err != nil {
return err
}
continue
case utils.MetaRemoveExpired:
if err = processBalances(func(b *Balance) bool {
return b.IsExpiredAt(currentTime)
}); err != nil {
return err
}
continue
}
cdrs = append(cdrs, cdr)
}
events := make([]*utils.CGREvent, 0, len(cdrs))
for _, cdr := range cdrs {
events = append(events, cdr.AsCGREvent())
}
var reply string
if err := connMgr.Call(context.TODO(), config.CgrConfig().SchedulerCfg().CDRsConns,
utils.CDRsV1ProcessEvents,
&ArgV1ProcessEvents{
Flags: []string{utils.ConcatenatedKey(utils.MetaChargers, "false")},
CGREvents: events,
}, &reply); err != nil {
return err
}
b, _ := json.Marshal(cdrs)
a.ExpirationString = string(b) // testing purpose only
return
return nil
}
func resetTriggersAction(ub *Account, a *Action, acs Actions, fltrS *FilterS, extraData any) (err error) {
@@ -1002,7 +1031,6 @@ func removeExpired(acc *Account, action *Action, _ Actions, _ *FilterS, extraDat
if acc == nil {
return fmt.Errorf("nil account for %s action", utils.ToJSON(action))
}
bChain, exists := acc.BalanceMap[action.Balance.GetType()]
if !exists {
return utils.ErrNotFound

View File

@@ -2615,17 +2615,17 @@ func TestCacheGetClonedActions(t *testing.T) {
// TestCdrLogAction
type RPCMock struct {
args *ArgV1ProcessEvent
args *ArgV1ProcessEvents
}
func (r *RPCMock) Call(ctx *context.Context, method string, args any, rply any) error {
if method != utils.CDRsV1ProcessEvent {
if method != utils.CDRsV1ProcessEvents {
return rpcclient.ErrUnsupporteServiceMethod
}
if r.args != nil {
return fmt.Errorf("There should be only one call to this function")
}
r.args = args.(*ArgV1ProcessEvent)
r.args = args.(*ArgV1ProcessEvents)
rp := rply.(*string)
*rp = utils.OK
return nil
@@ -2695,40 +2695,40 @@ func TestCdrLogAction(t *testing.T) {
if mock.args == nil {
t.Fatalf("Expected a call to %s", utils.CDRsV1ProcessEvent)
}
expCgrEv := utils.CGREvent{
expCgrEv := &utils.CGREvent{
Tenant: "cgrates.org",
ID: mock.args.CGREvent.ID,
ID: mock.args.CGREvents[0].ID,
Event: map[string]any{
"Account": "1001",
"ActionID": "CdrDebit",
"AnswerTime": mock.args.CGREvent.Event["AnswerTime"],
"AnswerTime": mock.args.CGREvents[0].Event["AnswerTime"],
"BalanceID": "*default",
"BalanceValue": "10",
"CGRID": mock.args.CGREvent.Event["CGRID"],
"CGRID": mock.args.CGREvents[0].Event["CGRID"],
"Category": "",
"Cost": 9.95,
"CostSource": "",
"Destination": "",
"ExtraInfo": "",
"OrderID": mock.args.CGREvent.Event["OrderID"],
"OrderID": mock.args.CGREvents[0].Event["OrderID"],
"OriginHost": "127.0.0.1",
"OriginID": mock.args.CGREvent.Event["OriginID"],
"OriginID": mock.args.CGREvents[0].Event["OriginID"],
"Partial": false,
"PreRated": true,
"RequestType": "*none",
"RunID": "*debit",
"SetupTime": mock.args.CGREvent.Event["SetupTime"],
"SetupTime": mock.args.CGREvents[0].Event["SetupTime"],
"Source": utils.CDRLog,
"Subject": "1001",
"Tenant": "cgrates.org",
"ToR": "*monetary",
"Usage": mock.args.CGREvent.Event["Usage"],
"Usage": mock.args.CGREvents[0].Event["Usage"],
"test": "val",
},
APIOpts: map[string]any{},
}
if !reflect.DeepEqual(expCgrEv, mock.args.CGREvent) {
t.Errorf("Expected: %+v \n,received: %+v", expCgrEv, mock.args.CGREvent)
if !reflect.DeepEqual(expCgrEv, mock.args.CGREvents[0]) {
t.Errorf("Expected: %+v \n,received: %+v", utils.ToJSON(expCgrEv), utils.ToJSON(mock.args.CGREvents[0]))
}
}

View File

@@ -413,8 +413,8 @@ cgrates.org,sms,1001,2014-01-14T00:00:00Z,RP_ANY,`,
// The test steps are as follows:
// 1. Create an account with 2 balances of types *sms and *monetary. The topup action for the *monetary one will also include
// the creation of a CDR.
// 2. Set an action bundle with both "*remove_balance" and "*cdrlog" actions.
// 3. Retrieve both CDRs and check whether the their fields are set correctly.
// 2. Set 3 action bundles with "*topup_reset", "*remove_balance" and "*remove_expired" actions, each coupled with a "*cdrlog" action.
// 3. Retrieve the CDRs and check whether the their fields are set correctly.
func TestBalanceCDRLog(t *testing.T) {
switch *dbType {
case utils.MetaInternal:
@@ -461,8 +461,10 @@ cgrates.org,ACC_TEST,PACKAGE_ACC_TEST,,,`,
PACKAGE_ACC_TEST,ACT_TOPUP_MONETARY,*asap,10
PACKAGE_ACC_TEST,ACT_TOPUP_SMS,*asap,10`,
utils.ActionsCsv: `#ActionsId[0],Action[1],ExtraParameters[2],Filter[3],BalanceId[4],BalanceType[5],Categories[6],DestinationIds[7],RatingSubject[8],SharedGroup[9],ExpiryTime[10],TimingIds[11],Units[12],BalanceWeight[13],BalanceBlocker[14],BalanceDisabled[15],Weight[16]
ACT_REMOVE_BALANCE_MONETARY,*cdrlog,"{""BalanceID"":""~*acnt.BalanceID""}",,,,,,,,,,,,,,
ACT_REMOVE_BALANCE_MONETARY,*cdrlog,,,,,,,,,,,,,,,
ACT_REMOVE_BALANCE_MONETARY,*remove_balance,,,balance_monetary,*monetary,,,,,,,,,,,
ACT_REMOVE_EXPIRED,*cdrlog,,,,*monetary,,,,,,,,,,,
ACT_REMOVE_EXPIRED,*remove_expired,,,,*monetary,,,,,,,,,,,
ACT_TOPUP_MONETARY,*cdrlog,"{""BalanceID"":""~*acnt.BalanceID""}",,,,,,,,,,,,,,
ACT_TOPUP_MONETARY,*topup_reset,,,balance_monetary,*monetary,,*any,,,*unlimited,,150,20,false,false,20
ACT_TOPUP_SMS,*topup_reset,,,balance_sms,*sms,,*any,,,*unlimited,,1000,10,false,false,10`,
@@ -540,6 +542,76 @@ ACT_TOPUP_SMS,*topup_reset,,,balance_sms,*sms,,*any,,,*unlimited,,1000,10,false,
}
})
t.Run("RemoveExpiredBalances", func(t *testing.T) {
expiryTime := time.Now().Add(10 * time.Millisecond)
attrSetBalance := utils.AttrSetBalances{
Tenant: "cgrates.org",
Account: "ACC_TEST",
Balances: []*utils.AttrBalance{
{
BalanceType: utils.MetaMonetary,
Value: 10,
Balance: map[string]any{
utils.ID: "expired_balance1",
utils.ExpiryTime: expiryTime,
},
},
{
BalanceType: utils.MetaMonetary,
Value: 11,
Balance: map[string]any{
utils.ID: "expired_balance2",
utils.ExpiryTime: expiryTime,
},
},
{
BalanceType: utils.MetaMonetary,
Value: 12,
Balance: map[string]any{
utils.ID: "expired_balance3",
utils.ExpiryTime: expiryTime,
},
},
},
}
var reply string
if err := client.Call(context.Background(), utils.APIerSv1SetBalances, &attrSetBalance, &reply); err != nil {
t.Error(err)
}
time.Sleep(10 * time.Millisecond)
attrsEA := &utils.AttrExecuteAction{Tenant: "cgrates.org", Account: "ACC_TEST", ActionsId: "ACT_REMOVE_EXPIRED"}
if err := client.Call(context.Background(), utils.APIerSv1ExecuteAction, attrsEA, &reply); err != nil {
t.Error(err)
}
})
t.Run("CheckRemoveExpiredCDR", func(t *testing.T) {
var cdrs []*engine.CDR
if err := client.Call(context.Background(), utils.CDRsV1GetCDRs, &utils.RPCCDRsFilterWithAPIOpts{
RPCCDRsFilter: &utils.RPCCDRsFilter{
RunIDs: []string{"*remove_expired"},
OrderBy: utils.Cost,
}}, &cdrs); err != nil {
t.Fatal(err)
}
if len(cdrs) != 3 ||
cdrs[0].RunID != utils.MetaRemoveExpired ||
cdrs[0].Source != utils.CDRLog ||
cdrs[0].ToR != utils.MetaMonetary ||
cdrs[0].Cost != 10 ||
cdrs[1].RunID != utils.MetaRemoveExpired ||
cdrs[1].Source != utils.CDRLog ||
cdrs[1].ToR != utils.MetaMonetary ||
cdrs[1].Cost != 11 ||
cdrs[2].RunID != utils.MetaRemoveExpired ||
cdrs[2].Source != utils.CDRLog ||
cdrs[2].ToR != utils.MetaMonetary ||
cdrs[2].Cost != 12 {
t.Errorf("unexpected cdrs received: %v", utils.ToJSON(cdrs))
}
})
t.Run("CheckFinalBalances", func(t *testing.T) {
var acnt engine.Account
if err := client.Call(context.Background(), utils.APIerSv2GetAccount,