From 41976e57218f9b0e4cf12af38f9db0c0938c3a6b Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Fri, 16 Feb 2024 12:52:11 -0500 Subject: [PATCH] Update *cdrlog action to support *remove_expired actions Will also be able to process more than one CDR per action. --- engine/action.go | 114 +++++++++++++++++++------------ engine/actions_test.go | 26 +++---- general_tests/balance_it_test.go | 78 ++++++++++++++++++++- 3 files changed, 159 insertions(+), 59 deletions(-) diff --git a/engine/action.go b/engine/action.go index 007017d4b..6780ca65b 100644 --- a/engine/action.go +++ b/engine/action.go @@ -224,17 +224,18 @@ func cdrLogAction(acc *Account, a *Action, acs Actions, _ *FilterS, extraData an utils.Tenant: config.NewRSRParsersMustCompile(utils.DynamicDataPrefix+utils.MetaAcnt+utils.NestingSep+utils.Tenant, utils.InfieldSep), utils.AccountField: config.NewRSRParsersMustCompile(utils.DynamicDataPrefix+utils.MetaAcnt+utils.NestingSep+utils.AccountField, utils.InfieldSep), utils.Subject: config.NewRSRParsersMustCompile(utils.DynamicDataPrefix+utils.MetaAcnt+utils.NestingSep+utils.AccountField, utils.InfieldSep), + utils.Cost: config.NewRSRParsersMustCompile(utils.DynamicDataPrefix+utils.MetaAct+utils.NestingSep+utils.ActionValue, utils.InfieldSep), } template := make(map[string]string) // overwrite default template if a.ExtraParameters != "" { if err = json.Unmarshal([]byte(a.ExtraParameters), &template); err != nil { - return + return err } for field, rsr := range template { if defaultTemplate[field], err = config.NewRSRParsers(rsr, config.CgrConfig().GeneralCfg().RSRSep); err != nil { - return + return err } } } @@ -243,9 +244,10 @@ func cdrLogAction(acc *Account, a *Action, acs Actions, _ *FilterS, extraData an for key, val := range mapExtraData { if defaultTemplate[key], err = config.NewRSRParsers(utils.IfaceAsString(val), config.CgrConfig().GeneralCfg().RSRSep); err != nil { - return + return err } } + currentTime := time.Now() // set stored cdr values var cdrs []*CDR @@ -254,6 +256,7 @@ func cdrLogAction(acc *Account, a *Action, acs Actions, _ *FilterS, extraData an []string{utils.MetaDebit, utils.MetaDebitReset, utils.MetaTopUp, utils.MetaTopUpReset, utils.MetaSetBalance, utils.MetaRemoveBalance, + utils.MetaRemoveExpired, }, action.ActionType) || action.Balance == nil { continue // Only log specific actions } @@ -261,8 +264,8 @@ func cdrLogAction(acc *Account, a *Action, acs Actions, _ *FilterS, extraData an cdr := &CDR{ RunID: action.ActionType, Source: utils.CDRLog, - SetupTime: time.Now(), - AnswerTime: time.Now(), + SetupTime: currentTime, + AnswerTime: currentTime, OriginID: utils.GenUUID(), ExtraFields: make(map[string]string), PreRated: true, @@ -270,32 +273,6 @@ func cdrLogAction(acc *Account, a *Action, acs Actions, _ *FilterS, extraData an } cdr.CGRID = utils.Sha1(cdr.OriginID, cdr.OriginHost) - // If the action is of type *remove_balance, retrieve the balance value from the account - // and assign it to the CDR's Cost field. - if action.ActionType == utils.MetaRemoveBalance { - if acc == nil { - return fmt.Errorf("nil account for action %s", utils.ToJSON(action)) - } - balanceChain, exists := acc.BalanceMap[action.Balance.GetType()] - if !exists { - return utils.ErrNotFound - } - found := false - for _, balance := range balanceChain { - if balance.MatchFilter(action.Balance, false, false) { - cdr.Cost = balance.Value - found = true - break - } - } - if !found { - return utils.ErrNotFound - } - } else { - // Otherwise, update the template to retrieve it from Action's BalanceValue. - defaultTemplate[utils.Cost] = config.NewRSRParsersMustCompile(utils.DynamicDataPrefix+utils.MetaAct+utils.NestingSep+utils.ActionValue, utils.InfieldSep) - } - cdrLogProvider := newCdrLogProvider(acc, action) elem := reflect.ValueOf(cdr).Elem() for key, rsrFlds := range defaultTemplate { @@ -325,21 +302,73 @@ func cdrLogAction(acc *Account, a *Action, acs Actions, _ *FilterS, extraData an cdr.ExtraFields[key] = parsedValue } } - cdrs = append(cdrs, cdr) - var rply string - // After compute the CDR send it to CDR Server to be processed - if err := connMgr.Call(context.TODO(), config.CgrConfig().SchedulerCfg().CDRsConns, - utils.CDRsV1ProcessEvent, - &ArgV1ProcessEvent{ - Flags: []string{utils.ConcatenatedKey(utils.MetaChargers, "false")}, // do not try to get the chargers for cdrlog - CGREvent: *cdr.AsCGREvent(), - }, &rply); err != nil { - return err + + // Function to process balances and append CDR if conditions are met. + processBalances := func(checkFunc func(*Balance) bool) error { + if acc == nil { + return fmt.Errorf("nil account for action %s", utils.ToJSON(action)) + } + balanceChain, exists := acc.BalanceMap[action.Balance.GetType()] + if !exists { + return utils.ErrNotFound + } + found := false + for _, balance := range balanceChain { + if checkFunc(balance) { + // Create a new CDR instance for each balance that meets the condition. + newCDR := *cdr // Copy CDR's values to a new CDR instance. + newCDR.Cost = balance.Value + newCDR.OriginID = utils.GenUUID() // OriginID must be unique for every CDR. + newCDR.CGRID = utils.Sha1(newCDR.OriginID, newCDR.OriginHost) + newCDR.ExtraFields[utils.BalanceID] = balance.ID + cdrs = append(cdrs, &newCDR) // Append the address of the new instance. + found = true + } + } + if !found { + return utils.ErrNotFound + } + return nil } + + // If the action is of type *remove_balance or *remove_expired, for each matched balance, + // assign the balance values to the CDR cost and append to the list of CDRs. + switch action.ActionType { + case utils.MetaRemoveBalance: + if err = processBalances(func(b *Balance) bool { + return b.MatchFilter(action.Balance, false, false) + }); err != nil { + return err + } + continue + case utils.MetaRemoveExpired: + if err = processBalances(func(b *Balance) bool { + return b.IsExpiredAt(currentTime) + }); err != nil { + return err + } + continue + } + + cdrs = append(cdrs, cdr) + } + + events := make([]*utils.CGREvent, 0, len(cdrs)) + for _, cdr := range cdrs { + events = append(events, cdr.AsCGREvent()) + } + var reply string + if err := connMgr.Call(context.TODO(), config.CgrConfig().SchedulerCfg().CDRsConns, + utils.CDRsV1ProcessEvents, + &ArgV1ProcessEvents{ + Flags: []string{utils.ConcatenatedKey(utils.MetaChargers, "false")}, + CGREvents: events, + }, &reply); err != nil { + return err } b, _ := json.Marshal(cdrs) a.ExpirationString = string(b) // testing purpose only - return + return nil } func resetTriggersAction(ub *Account, a *Action, acs Actions, fltrS *FilterS, extraData any) (err error) { @@ -1002,7 +1031,6 @@ func removeExpired(acc *Account, action *Action, _ Actions, _ *FilterS, extraDat if acc == nil { return fmt.Errorf("nil account for %s action", utils.ToJSON(action)) } - bChain, exists := acc.BalanceMap[action.Balance.GetType()] if !exists { return utils.ErrNotFound diff --git a/engine/actions_test.go b/engine/actions_test.go index 7e7775087..527728e98 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -2615,17 +2615,17 @@ func TestCacheGetClonedActions(t *testing.T) { // TestCdrLogAction type RPCMock struct { - args *ArgV1ProcessEvent + args *ArgV1ProcessEvents } func (r *RPCMock) Call(ctx *context.Context, method string, args any, rply any) error { - if method != utils.CDRsV1ProcessEvent { + if method != utils.CDRsV1ProcessEvents { return rpcclient.ErrUnsupporteServiceMethod } if r.args != nil { return fmt.Errorf("There should be only one call to this function") } - r.args = args.(*ArgV1ProcessEvent) + r.args = args.(*ArgV1ProcessEvents) rp := rply.(*string) *rp = utils.OK return nil @@ -2695,40 +2695,40 @@ func TestCdrLogAction(t *testing.T) { if mock.args == nil { t.Fatalf("Expected a call to %s", utils.CDRsV1ProcessEvent) } - expCgrEv := utils.CGREvent{ + expCgrEv := &utils.CGREvent{ Tenant: "cgrates.org", - ID: mock.args.CGREvent.ID, + ID: mock.args.CGREvents[0].ID, Event: map[string]any{ "Account": "1001", "ActionID": "CdrDebit", - "AnswerTime": mock.args.CGREvent.Event["AnswerTime"], + "AnswerTime": mock.args.CGREvents[0].Event["AnswerTime"], "BalanceID": "*default", "BalanceValue": "10", - "CGRID": mock.args.CGREvent.Event["CGRID"], + "CGRID": mock.args.CGREvents[0].Event["CGRID"], "Category": "", "Cost": 9.95, "CostSource": "", "Destination": "", "ExtraInfo": "", - "OrderID": mock.args.CGREvent.Event["OrderID"], + "OrderID": mock.args.CGREvents[0].Event["OrderID"], "OriginHost": "127.0.0.1", - "OriginID": mock.args.CGREvent.Event["OriginID"], + "OriginID": mock.args.CGREvents[0].Event["OriginID"], "Partial": false, "PreRated": true, "RequestType": "*none", "RunID": "*debit", - "SetupTime": mock.args.CGREvent.Event["SetupTime"], + "SetupTime": mock.args.CGREvents[0].Event["SetupTime"], "Source": utils.CDRLog, "Subject": "1001", "Tenant": "cgrates.org", "ToR": "*monetary", - "Usage": mock.args.CGREvent.Event["Usage"], + "Usage": mock.args.CGREvents[0].Event["Usage"], "test": "val", }, APIOpts: map[string]any{}, } - if !reflect.DeepEqual(expCgrEv, mock.args.CGREvent) { - t.Errorf("Expected: %+v \n,received: %+v", expCgrEv, mock.args.CGREvent) + if !reflect.DeepEqual(expCgrEv, mock.args.CGREvents[0]) { + t.Errorf("Expected: %+v \n,received: %+v", utils.ToJSON(expCgrEv), utils.ToJSON(mock.args.CGREvents[0])) } } diff --git a/general_tests/balance_it_test.go b/general_tests/balance_it_test.go index 3394769c6..ebb23927c 100644 --- a/general_tests/balance_it_test.go +++ b/general_tests/balance_it_test.go @@ -413,8 +413,8 @@ cgrates.org,sms,1001,2014-01-14T00:00:00Z,RP_ANY,`, // The test steps are as follows: // 1. Create an account with 2 balances of types *sms and *monetary. The topup action for the *monetary one will also include // the creation of a CDR. -// 2. Set an action bundle with both "*remove_balance" and "*cdrlog" actions. -// 3. Retrieve both CDRs and check whether the their fields are set correctly. +// 2. Set 3 action bundles with "*topup_reset", "*remove_balance" and "*remove_expired" actions, each coupled with a "*cdrlog" action. +// 3. Retrieve the CDRs and check whether the their fields are set correctly. func TestBalanceCDRLog(t *testing.T) { switch *dbType { case utils.MetaInternal: @@ -461,8 +461,10 @@ cgrates.org,ACC_TEST,PACKAGE_ACC_TEST,,,`, PACKAGE_ACC_TEST,ACT_TOPUP_MONETARY,*asap,10 PACKAGE_ACC_TEST,ACT_TOPUP_SMS,*asap,10`, utils.ActionsCsv: `#ActionsId[0],Action[1],ExtraParameters[2],Filter[3],BalanceId[4],BalanceType[5],Categories[6],DestinationIds[7],RatingSubject[8],SharedGroup[9],ExpiryTime[10],TimingIds[11],Units[12],BalanceWeight[13],BalanceBlocker[14],BalanceDisabled[15],Weight[16] -ACT_REMOVE_BALANCE_MONETARY,*cdrlog,"{""BalanceID"":""~*acnt.BalanceID""}",,,,,,,,,,,,,, +ACT_REMOVE_BALANCE_MONETARY,*cdrlog,,,,,,,,,,,,,,, ACT_REMOVE_BALANCE_MONETARY,*remove_balance,,,balance_monetary,*monetary,,,,,,,,,,, +ACT_REMOVE_EXPIRED,*cdrlog,,,,*monetary,,,,,,,,,,, +ACT_REMOVE_EXPIRED,*remove_expired,,,,*monetary,,,,,,,,,,, ACT_TOPUP_MONETARY,*cdrlog,"{""BalanceID"":""~*acnt.BalanceID""}",,,,,,,,,,,,,, ACT_TOPUP_MONETARY,*topup_reset,,,balance_monetary,*monetary,,*any,,,*unlimited,,150,20,false,false,20 ACT_TOPUP_SMS,*topup_reset,,,balance_sms,*sms,,*any,,,*unlimited,,1000,10,false,false,10`, @@ -540,6 +542,76 @@ ACT_TOPUP_SMS,*topup_reset,,,balance_sms,*sms,,*any,,,*unlimited,,1000,10,false, } }) + t.Run("RemoveExpiredBalances", func(t *testing.T) { + expiryTime := time.Now().Add(10 * time.Millisecond) + attrSetBalance := utils.AttrSetBalances{ + Tenant: "cgrates.org", + Account: "ACC_TEST", + Balances: []*utils.AttrBalance{ + { + BalanceType: utils.MetaMonetary, + Value: 10, + Balance: map[string]any{ + utils.ID: "expired_balance1", + utils.ExpiryTime: expiryTime, + }, + }, + { + BalanceType: utils.MetaMonetary, + Value: 11, + Balance: map[string]any{ + utils.ID: "expired_balance2", + utils.ExpiryTime: expiryTime, + }, + }, + { + BalanceType: utils.MetaMonetary, + Value: 12, + Balance: map[string]any{ + utils.ID: "expired_balance3", + utils.ExpiryTime: expiryTime, + }, + }, + }, + } + var reply string + if err := client.Call(context.Background(), utils.APIerSv1SetBalances, &attrSetBalance, &reply); err != nil { + t.Error(err) + } + time.Sleep(10 * time.Millisecond) + attrsEA := &utils.AttrExecuteAction{Tenant: "cgrates.org", Account: "ACC_TEST", ActionsId: "ACT_REMOVE_EXPIRED"} + if err := client.Call(context.Background(), utils.APIerSv1ExecuteAction, attrsEA, &reply); err != nil { + t.Error(err) + } + }) + + t.Run("CheckRemoveExpiredCDR", func(t *testing.T) { + var cdrs []*engine.CDR + if err := client.Call(context.Background(), utils.CDRsV1GetCDRs, &utils.RPCCDRsFilterWithAPIOpts{ + RPCCDRsFilter: &utils.RPCCDRsFilter{ + RunIDs: []string{"*remove_expired"}, + OrderBy: utils.Cost, + }}, &cdrs); err != nil { + t.Fatal(err) + } + + if len(cdrs) != 3 || + cdrs[0].RunID != utils.MetaRemoveExpired || + cdrs[0].Source != utils.CDRLog || + cdrs[0].ToR != utils.MetaMonetary || + cdrs[0].Cost != 10 || + cdrs[1].RunID != utils.MetaRemoveExpired || + cdrs[1].Source != utils.CDRLog || + cdrs[1].ToR != utils.MetaMonetary || + cdrs[1].Cost != 11 || + cdrs[2].RunID != utils.MetaRemoveExpired || + cdrs[2].Source != utils.CDRLog || + cdrs[2].ToR != utils.MetaMonetary || + cdrs[2].Cost != 12 { + t.Errorf("unexpected cdrs received: %v", utils.ToJSON(cdrs)) + } + }) + t.Run("CheckFinalBalances", func(t *testing.T) { var acnt engine.Account if err := client.Call(context.Background(), utils.APIerSv2GetAccount,