From 979bdd1be180c76ec8d3024cc4d16f07af550244 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 1 May 2018 15:09:47 +0200 Subject: [PATCH] Diameter agent refactoring to use new SessionSv1 APIs as well as CGRReply in responses --- agents/dmtagent.go | 131 +++++++++++++++++++++-------------- agents/dmtagent_it_test.go | 4 +- agents/libdmt.go | 40 ++++++----- agents/libdmt_test.go | 5 +- agents/librad.go | 20 ++++++ agents/radagent.go | 4 +- cmd/cgr-loader/cgr-loader.go | 3 +- sessions/sessions.go | 21 ++++++ 8 files changed, 152 insertions(+), 76 deletions(-) diff --git a/agents/dmtagent.go b/agents/dmtagent.go index e961ed469..5466b6484 100644 --- a/agents/dmtagent.go +++ b/agents/dmtagent.go @@ -28,6 +28,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" "github.com/fiorix/go-diameter/diam" @@ -35,9 +36,10 @@ import ( "github.com/fiorix/go-diameter/diam/sm" ) -func NewDiameterAgent(cgrCfg *config.CGRConfig, smg rpcclient.RpcClientConnection, +func NewDiameterAgent(cgrCfg *config.CGRConfig, sessionS rpcclient.RpcClientConnection, pubsubs rpcclient.RpcClientConnection) (*DiameterAgent, error) { - da := &DiameterAgent{cgrCfg: cgrCfg, smg: smg, pubsubs: pubsubs, connMux: new(sync.Mutex)} + da := &DiameterAgent{cgrCfg: cgrCfg, sessionS: sessionS, + pubsubs: pubsubs, connMux: new(sync.Mutex)} if reflect.ValueOf(da.pubsubs).IsNil() { da.pubsubs = nil // Empty it so we can check it later } @@ -51,10 +53,10 @@ func NewDiameterAgent(cgrCfg *config.CGRConfig, smg rpcclient.RpcClientConnectio } type DiameterAgent struct { - cgrCfg *config.CGRConfig - smg rpcclient.RpcClientConnection // Connection towards CGR-SMG component - pubsubs rpcclient.RpcClientConnection // Connection towards CGR-PubSub component - connMux *sync.Mutex // Protect connection for read/write + cgrCfg *config.CGRConfig + sessionS rpcclient.RpcClientConnection // Connection towards CGR-SMG component + pubsubs rpcclient.RpcClientConnection // Connection towards CGR-PubSub component + connMux *sync.Mutex // Protect connection for read/write } // Creates the message handlers @@ -77,8 +79,8 @@ func (self *DiameterAgent) handlers() diam.Handler { return dSM } -func (self DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestProcessor, - processorVars map[string]string, cca *CCA) (bool, error) { +func (da DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestProcessor, + procVars processorVars, cca *CCA) (processed bool, err error) { passesAllFilters := true for _, fldFilter := range reqProcessor.RequestFilter { if passes, _ := passesFieldFilter(ccr.diamMessage, fldFilter, nil); !passes { @@ -93,14 +95,14 @@ func (self DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestPro utils.Logger.Info(fmt.Sprintf(" CCR message: %s", ccr.diamMessage)) } if !reqProcessor.AppendCCA { - *cca = *NewBareCCAFromCCR(ccr, self.cgrCfg.DiameterAgentCfg().OriginHost, self.cgrCfg.DiameterAgentCfg().OriginRealm) + *cca = *NewBareCCAFromCCR(ccr, da.cgrCfg.DiameterAgentCfg().OriginHost, da.cgrCfg.DiameterAgentCfg().OriginRealm) } smgEv, err := ccr.AsSMGenericEvent(reqProcessor.CCRFields) if err != nil { utils.Logger.Err(fmt.Sprintf(" Processing message: %+v AsSMGenericEvent, error: %s", ccr.diamMessage, err)) - *cca = *NewBareCCAFromCCR(ccr, self.cgrCfg.DiameterAgentCfg().OriginHost, self.cgrCfg.DiameterAgentCfg().OriginRealm) + *cca = *NewBareCCAFromCCR(ccr, da.cgrCfg.DiameterAgentCfg().OriginHost, da.cgrCfg.DiameterAgentCfg().OriginRealm) if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, strconv.Itoa(DiameterRatingFailed), - false, self.cgrCfg.DiameterAgentCfg().Timezone); err != nil { + false, da.cgrCfg.DiameterAgentCfg().Timezone); err != nil { utils.Logger.Err(fmt.Sprintf(" Processing message: %+v messageSetAVPsWithPath, error: %s", cca.diamMessage, err.Error())) return false, err } @@ -109,95 +111,122 @@ func (self DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestPro if len(reqProcessor.Flags) != 0 { smgEv[utils.CGRFlags] = reqProcessor.Flags.String() // Populate CGRFlags automatically } - if reqProcessor.PublishEvent && self.pubsubs != nil { + if reqProcessor.PublishEvent && da.pubsubs != nil { evt, err := smgEv.AsMapStringString() if err != nil { - *cca = *NewBareCCAFromCCR(ccr, self.cgrCfg.DiameterAgentCfg().OriginHost, self.cgrCfg.DiameterAgentCfg().OriginRealm) + *cca = *NewBareCCAFromCCR(ccr, da.cgrCfg.DiameterAgentCfg().OriginHost, da.cgrCfg.DiameterAgentCfg().OriginRealm) if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, strconv.Itoa(DiameterRatingFailed), - false, self.cgrCfg.DiameterAgentCfg().Timezone); err != nil { + false, da.cgrCfg.DiameterAgentCfg().Timezone); err != nil { return false, err } utils.Logger.Err(fmt.Sprintf(" Processing message: %+v failed converting SMGEvent to pubsub one, error: %s", ccr.diamMessage, err)) return false, ErrDiameterRatingFailed } var reply string - if err := self.pubsubs.Call("PubSubV1.Publish", engine.CgrEvent(evt), &reply); err != nil { - *cca = *NewBareCCAFromCCR(ccr, self.cgrCfg.DiameterAgentCfg().OriginHost, self.cgrCfg.DiameterAgentCfg().OriginRealm) + if err := da.pubsubs.Call("PubSubV1.Publish", engine.CgrEvent(evt), &reply); err != nil { + *cca = *NewBareCCAFromCCR(ccr, da.cgrCfg.DiameterAgentCfg().OriginHost, da.cgrCfg.DiameterAgentCfg().OriginRealm) if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, strconv.Itoa(DiameterRatingFailed), - false, self.cgrCfg.DiameterAgentCfg().Timezone); err != nil { + false, da.cgrCfg.DiameterAgentCfg().Timezone); err != nil { return false, err } utils.Logger.Err(fmt.Sprintf(" Processing message: %+v failed publishing event, error: %s", ccr.diamMessage, err)) return false, ErrDiameterRatingFailed } } - var maxUsage time.Duration - processorVars[CGRResultCode] = strconv.Itoa(diam.Success) - processorVars[CGRError] = "" + procVars[CGRResultCode] = strconv.Itoa(diam.Success) if reqProcessor.DryRun { // DryRun does not send over network utils.Logger.Info(fmt.Sprintf(" SMGenericEvent: %+v", smgEv)) - processorVars[CGRResultCode] = strconv.Itoa(diam.LimitedSuccess) - } else { // Find out maxUsage over APIs + procVars[CGRResultCode] = strconv.Itoa(diam.LimitedSuccess) + } else { // Query SessionS over APIs + var tnt string + if tntIf, has := smgEv[utils.Tenant]; has { + if tntStr, canCast := utils.CastFieldIfToString(tntIf); canCast { + tnt = tntStr + } + } + cgrEv := &utils.CGREvent{ + Tenant: utils.FirstNonEmpty(tnt, + config.CgrConfig().DefaultTenant), + ID: "dmt:" + utils.UUIDSha1Prefix(), + Time: utils.TimePointer(time.Now()), + Event: smgEv, + } switch ccr.CCRequestType { case 1: - err = self.smg.Call("SMGenericV2.InitiateSession", smgEv, &maxUsage) + var initReply sessions.V1InitSessionReply + err = da.sessionS.Call(utils.SessionSv1InitiateSession, + procVars.asV1InitSessionArgs(cgrEv), &initReply) + if procVars[utils.MetaCGRReply], err = utils.NewCGRReply(&initReply, err); err != nil { + return + } case 2: - err = self.smg.Call("SMGenericV2.UpdateSession", smgEv, &maxUsage) + var updateReply sessions.V1UpdateSessionReply + err = da.sessionS.Call(utils.SessionSv1UpdateSession, + procVars.asV1UpdateSessionArgs(cgrEv), &updateReply) + if procVars[utils.MetaCGRReply], err = utils.NewCGRReply(&updateReply, err); err != nil { + return + } case 3, 4: // Handle them together since we generate CDR for them var rpl string if ccr.CCRequestType == 3 { - err = self.smg.Call("SMGenericV1.TerminateSession", smgEv, &rpl) + if err = da.sessionS.Call(utils.SessionSv1TerminateSession, + procVars.asV1TerminateSessionArgs(cgrEv), &rpl); err != nil { + procVars[utils.MetaCGRReply] = &utils.CGRReply{utils.Error: err.Error()} + } } else if ccr.CCRequestType == 4 { - err = self.smg.Call("SMGenericV2.ChargeEvent", smgEv.Clone(), &maxUsage) - if maxUsage == 0 { - smgEv[utils.Usage] = 0 // For CDR not to debit + var evntRply sessions.V1ProcessEventReply + err = da.sessionS.Call(utils.SessionSv1ProcessEvent, + procVars.asV1ProcessEventArgs(cgrEv), &evntRply) + if procVars[utils.MetaCGRReply], err = utils.NewCGRReply(&evntRply, err); err != nil { + return } } - if self.cgrCfg.DiameterAgentCfg().CreateCDR && - (!self.cgrCfg.DiameterAgentCfg().CDRRequiresSession || err == nil || !strings.HasSuffix(err.Error(), utils.ErrNoActiveSession.Error())) { // Check if CDR requires session - if errCdr := self.smg.Call("SMGenericV1.ProcessCDR", smgEv, &rpl); errCdr != nil { + if da.cgrCfg.DiameterAgentCfg().CreateCDR && + (!da.cgrCfg.DiameterAgentCfg().CDRRequiresSession || err == nil || + !strings.HasSuffix(err.Error(), utils.ErrNoActiveSession.Error())) { // Check if CDR requires session + if errCdr := da.sessionS.Call(utils.SessionSv1ProcessCDR, *cgrEv, &rpl); errCdr != nil { err = errCdr + procVars[utils.MetaCGRReply] = &utils.CGRReply{utils.Error: err.Error()} } } } - if err != nil { + /*if err != nil { utils.Logger.Err(fmt.Sprintf(" Processing message: %+v, API error: %s", ccr.diamMessage, err)) switch { // Prettify some errors case strings.HasSuffix(err.Error(), utils.ErrAccountNotFound.Error()): - processorVars[CGRError] = utils.ErrAccountNotFound.Error() + procVars[CGRError] = utils.ErrAccountNotFound.Error() case strings.HasSuffix(err.Error(), utils.ErrUserNotFound.Error()): - processorVars[CGRError] = utils.ErrUserNotFound.Error() + procVars[CGRError] = utils.ErrUserNotFound.Error() case strings.HasSuffix(err.Error(), utils.ErrInsufficientCredit.Error()): - processorVars[CGRError] = utils.ErrInsufficientCredit.Error() + procVars[CGRError] = utils.ErrInsufficientCredit.Error() case strings.HasSuffix(err.Error(), utils.ErrAccountDisabled.Error()): - processorVars[CGRError] = utils.ErrAccountDisabled.Error() + procVars[CGRError] = utils.ErrAccountDisabled.Error() case strings.HasSuffix(err.Error(), utils.ErrRatingPlanNotFound.Error()): - processorVars[CGRError] = utils.ErrRatingPlanNotFound.Error() + procVars[CGRError] = utils.ErrRatingPlanNotFound.Error() case strings.HasSuffix(err.Error(), utils.ErrUnauthorizedDestination.Error()): - processorVars[CGRError] = utils.ErrUnauthorizedDestination.Error() + procVars[CGRError] = utils.ErrUnauthorizedDestination.Error() default: // Unknown error - processorVars[CGRError] = err.Error() - processorVars[CGRResultCode] = strconv.Itoa(DiameterRatingFailed) + procVars[CGRError] = err.Error() + procVars[CGRResultCode] = strconv.Itoa(DiameterRatingFailed) } } - if maxUsage < 0 { - maxUsage = 0 - } - if prevMaxUsageStr, hasKey := processorVars[CGRMaxUsage]; hasKey { + */ + /*if prevMaxUsageStr, hasKey := procVars[CGRMaxUsage]; hasKey { prevMaxUsage, _ := utils.ParseDurationWithNanosecs(prevMaxUsageStr) if prevMaxUsage < maxUsage { maxUsage = prevMaxUsage } } - processorVars[CGRMaxUsage] = strconv.FormatInt(maxUsage.Nanoseconds(), 10) + */ } - if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, processorVars[CGRResultCode], - false, self.cgrCfg.DiameterAgentCfg().Timezone); err != nil { + diamCode, _ := procVars.valAsString(CGRResultCode) + if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, diamCode, + false, da.cgrCfg.DiameterAgentCfg().Timezone); err != nil { return false, err } - if err := cca.SetProcessorAVPs(reqProcessor, processorVars); err != nil { + if err := cca.SetProcessorAVPs(reqProcessor, procVars); err != nil { if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, strconv.Itoa(DiameterRatingFailed), - false, self.cgrCfg.DiameterAgentCfg().Timezone); err != nil { + false, da.cgrCfg.DiameterAgentCfg().Timezone); err != nil { return false, err } utils.Logger.Err(fmt.Sprintf(" CCA SetProcessorAVPs for message: %+v, error: %s", ccr.diamMessage, err)) @@ -217,9 +246,9 @@ func (self *DiameterAgent) handlerCCR(c diam.Conn, m *diam.Message) { } cca := NewBareCCAFromCCR(ccr, self.cgrCfg.DiameterAgentCfg().OriginHost, self.cgrCfg.DiameterAgentCfg().OriginRealm) var processed, lclProcessed bool - processorVars := make(map[string]string) // Shared between processors + procVars := make(processorVars) // Shared between processors for _, reqProcessor := range self.cgrCfg.DiameterAgentCfg().RequestProcessors { - lclProcessed, err = self.processCCR(ccr, reqProcessor, processorVars, cca) + lclProcessed, err = self.processCCR(ccr, reqProcessor, procVars, cca) if lclProcessed { // Process local so we don't overwrite globally processed = lclProcessed } diff --git a/agents/dmtagent_it_test.go b/agents/dmtagent_it_test.go index 9c5f18acc..7ff1ea92f 100644 --- a/agents/dmtagent_it_test.go +++ b/agents/dmtagent_it_test.go @@ -171,7 +171,7 @@ func TestDmtAgentPopulateCCTotalOctets(t *testing.T) { ccr.diamMessage = ccr.AsBareDiameterMessage() cca := NewBareCCAFromCCR(ccr, "cgr-da", "cgrates.org") if err := cca.SetProcessorAVPs(daRP, - map[string]string{CGRError: "", CGRMaxUsage: "153600"}); err != nil { + processorVars{CGRError: "", CGRMaxUsage: "153600"}); err != nil { t.Error(err) } if avps, err := cca.diamMessage.FindAVPsWithPath([]interface{}{ @@ -207,12 +207,14 @@ func TestDmtAgentResetStorDb(t *testing.T) { } } +/* // Start CGR Engine func TestDmtAgentStartEngine(t *testing.T) { if _, err := engine.StopStartEngine(daCfgPath, 4000); err != nil { t.Fatal(err) } } +*/ // Connect rpc client to rater func TestDmtAgentApierRpcConn(t *testing.T) { diff --git a/agents/libdmt.go b/agents/libdmt.go index 6087e8a50..62b9ae276 100644 --- a/agents/libdmt.go +++ b/agents/libdmt.go @@ -179,7 +179,7 @@ func avpValAsString(a *diam.AVP) string { } // Handler for meta functions -func metaHandler(m *diam.Message, processorVars map[string]string, +func metaHandler(m *diam.Message, procVars processorVars, tag, arg string, dur time.Duration) (string, error) { switch tag { case META_CCR_USAGE: @@ -225,9 +225,9 @@ func metaHandler(m *diam.Message, processorVars map[string]string, // metaValueExponent will multiply the float value with the exponent provided. // Expects 2 arguments in template separated by | -func metaValueExponent(m *diam.Message, processorVars map[string]string, +func metaValueExponent(m *diam.Message, procVars processorVars, argsTpl utils.RSRFields, roundingDecimals int) (string, error) { - valStr := composedFieldvalue(m, argsTpl, 0, processorVars) + valStr := composedFieldvalue(m, argsTpl, 0, procVars) handlerArgs := strings.Split(valStr, utils.HandlerArgSep) if len(handlerArgs) != 2 { return "", errors.New("Unexpected number of arguments") @@ -244,9 +244,9 @@ func metaValueExponent(m *diam.Message, processorVars map[string]string, return strconv.FormatFloat(utils.Round(res, roundingDecimals, utils.ROUNDING_MIDDLE), 'f', -1, 64), nil } -func metaSum(m *diam.Message, processorVars map[string]string, +func metaSum(m *diam.Message, procVars processorVars, argsTpl utils.RSRFields, passAtIndex, roundingDecimals int) (string, error) { - valStr := composedFieldvalue(m, argsTpl, passAtIndex, processorVars) + valStr := composedFieldvalue(m, argsTpl, passAtIndex, procVars) handlerArgs := strings.Split(valStr, utils.HandlerArgSep) var summed float64 for _, arg := range handlerArgs { @@ -275,12 +275,13 @@ func avpsWithPath(m *diam.Message, rsrFld *utils.RSRField) ([]*diam.AVP, error) splitIntoInterface(rsrFld.Id, utils.HIERARCHY_SEP), dict.UndefinedVendorID) } -func passesFieldFilter(m *diam.Message, fieldFilter *utils.RSRField, processorVars map[string]string) (bool, int) { +func passesFieldFilter(m *diam.Message, fieldFilter *utils.RSRField, procVars processorVars) (bool, int) { if fieldFilter == nil { return true, 0 } - if val, hasIt := processorVars[fieldFilter.Id]; hasIt { // ProcessorVars have priority - if fieldFilter.FilterPasses(val) { + if val, hasIt := procVars[fieldFilter.Id]; hasIt { // ProcessorVars have priority + valStr, _ := utils.CastFieldIfToString(val) + if fieldFilter.FilterPasses(valStr) { return true, 0 } return false, 0 @@ -302,14 +303,15 @@ func passesFieldFilter(m *diam.Message, fieldFilter *utils.RSRField, processorVa return false, 0 } -func composedFieldvalue(m *diam.Message, outTpl utils.RSRFields, avpIdx int, processorVars map[string]string) string { +func composedFieldvalue(m *diam.Message, outTpl utils.RSRFields, avpIdx int, procVars processorVars) string { var outVal string for _, rsrTpl := range outTpl { if rsrTpl.IsStatic() { outVal += rsrTpl.ParseValue("") } else { - if val, hasIt := processorVars[rsrTpl.Id]; hasIt { // ProcessorVars have priority - outVal += rsrTpl.ParseValue(val) + if val, hasIt := procVars[rsrTpl.Id]; hasIt { // ProcessorVars have priority + valStr, _ := utils.CastFieldIfToString(val) + outVal += rsrTpl.ParseValue(valStr) continue } matchingAvps, err := avpsWithPath(m, rsrTpl) @@ -380,13 +382,13 @@ func serializeAVPValueFromString(dictAVP *dict.AVP, valStr, timezone string) ([] } func fieldOutVal(m *diam.Message, cfgFld *config.CfgCdrField, - extraParam interface{}, processorVars map[string]string) (fmtValOut string, err error) { + extraParam interface{}, procVars processorVars) (fmtValOut string, err error) { var outVal string passAtIndex := -1 passedAllFilters := true for _, fldFilter := range cfgFld.FieldFilter { var pass bool - if pass, passAtIndex = passesFieldFilter(m, fldFilter, processorVars); !pass { + if pass, passAtIndex = passesFieldFilter(m, fldFilter, procVars); !pass { passedAllFilters = false break } @@ -406,19 +408,19 @@ func fieldOutVal(m *diam.Message, cfgFld *config.CfgCdrField, case utils.META_HANDLER: switch cfgFld.HandlerId { case META_VALUE_EXPONENT: - outVal, err = metaValueExponent(m, processorVars, cfgFld.Value, 10) // FixMe: add here configured number of decimals + outVal, err = metaValueExponent(m, procVars, cfgFld.Value, 10) // FixMe: add here configured number of decimals case META_SUM: - outVal, err = metaSum(m, processorVars, cfgFld.Value, passAtIndex, 10) + outVal, err = metaSum(m, procVars, cfgFld.Value, passAtIndex, 10) default: - outVal, err = metaHandler(m, processorVars, cfgFld.HandlerId, cfgFld.Layout, extraParam.(time.Duration)) + outVal, err = metaHandler(m, procVars, cfgFld.HandlerId, cfgFld.Layout, extraParam.(time.Duration)) if err != nil { utils.Logger.Warning(fmt.Sprintf(" Ignoring processing of metafunction: %s, error: %s", cfgFld.HandlerId, err.Error())) } } case utils.META_COMPOSED: - outVal = composedFieldvalue(m, cfgFld.Value, 0, processorVars) + outVal = composedFieldvalue(m, cfgFld.Value, 0, procVars) case utils.MetaGrouped: // GroupedAVP - outVal = composedFieldvalue(m, cfgFld.Value, passAtIndex, processorVars) + outVal = composedFieldvalue(m, cfgFld.Value, passAtIndex, procVars) } if fmtValOut, err = utils.FmtFieldWidth(cfgFld.Tag, outVal, cfgFld.Width, cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil { utils.Logger.Warning(fmt.Sprintf(" Error when processing field template with tag: %s, error: %s", cfgFld.Tag, err.Error())) @@ -692,7 +694,7 @@ func (self *CCA) AsDiameterMessage() *diam.Message { } // SetProcessorAVPs will add AVPs to self.diameterMessage based on template defined in processor.CCAFields -func (self *CCA) SetProcessorAVPs(reqProcessor *config.DARequestProcessor, processorVars map[string]string) error { +func (self *CCA) SetProcessorAVPs(reqProcessor *config.DARequestProcessor, processorVars processorVars) error { for _, cfgFld := range reqProcessor.CCAFields { fmtOut, err := fieldOutVal(self.ccrMessage, cfgFld, nil, processorVars) if err == ErrFilterNotPassing { // Field not in or filter not passing, try match in answer diff --git a/agents/libdmt_test.go b/agents/libdmt_test.go index 1a0853632..f349d4d67 100644 --- a/agents/libdmt_test.go +++ b/agents/libdmt_test.go @@ -197,7 +197,8 @@ func TestFieldOutVal(t *testing.T) { t.Error("Should have error") } eOut = "360" - if fldOut, err := fieldOutVal(m, cfgFld, time.Duration(0), map[string]string{"CGRError": "INSUFFICIENT_CREDIT"}); err != nil { + if fldOut, err := fieldOutVal(m, cfgFld, time.Duration(0), + processorVars{"CGRError": "INSUFFICIENT_CREDIT"}); err != nil { t.Error(err) } else if fldOut != eOut { t.Errorf("Expecting:\n%s\nReceived:\n%s", eOut, fldOut) @@ -423,7 +424,7 @@ func TestCCASetProcessorAVPs(t *testing.T) { diam.NewAVP(450, avp.Mbit, 0, datatype.Enumerated(0)), // Subscription-Id-Type diam.NewAVP(444, avp.Mbit, 0, datatype.UTF8String("33708000003")), // Subscription-Id-Data }}) - if err := cca.SetProcessorAVPs(reqProcessor, map[string]string{}); err != nil { + if err := cca.SetProcessorAVPs(reqProcessor, processorVars{}); err != nil { t.Error(err) } else if ccaMsg := cca.AsDiameterMessage(); !reflect.DeepEqual(eMessage, ccaMsg) { t.Errorf("Expecting: %+v, received: %+v", eMessage, ccaMsg) diff --git a/agents/librad.go b/agents/librad.go index 2b458aab1..0825cf57b 100644 --- a/agents/librad.go +++ b/agents/librad.go @@ -172,6 +172,26 @@ func (pv processorVars) asV1TerminateSessionArgs(cgrEv *utils.CGREvent) (args *s return } +func (pv processorVars) asV1ProcessEventArgs(cgrEv *utils.CGREvent) (args *sessions.V1ProcessEventArgs) { + args = &sessions.V1ProcessEventArgs{ // defaults + Debit: true, + CGREvent: *cgrEv, + } + if !pv.hasSubsystems() { + return + } + if !pv.hasVar(utils.MetaAccounts) { + args.Debit = false + } + if pv.hasVar(utils.MetaResources) { + args.AllocateResources = true + } + if pv.hasVar(utils.MetaAttributes) { + args.GetAttributes = true + } + return +} + // radAttrVendorFromPath returns AttributenName and VendorName from path // path should be the form attributeName or vendorName/attributeName func attrVendorFromPath(path string) (attrName, vendorName string) { diff --git a/agents/radagent.go b/agents/radagent.go index c02d21cd6..d300ebf5e 100644 --- a/agents/radagent.go +++ b/agents/radagent.go @@ -199,10 +199,9 @@ func (ra *RadiusAgent) processRequest(reqProcessor *config.RARequestProcessor, } if ra.cgrCfg.RadiusAgentCfg().CreateCDR { if errCdr := ra.sessionS.Call(utils.SessionSv1ProcessCDR, *cgrEv, &rpl); errCdr != nil { - procVars[utils.MetaCGRReply] = &utils.CGRReply{utils.Error: err.Error()} err = errCdr + procVars[utils.MetaCGRReply] = &utils.CGRReply{utils.Error: err.Error()} } - } if err != nil { return @@ -211,6 +210,7 @@ func (ra *RadiusAgent) processRequest(reqProcessor *config.RARequestProcessor, err = fmt.Errorf("unsupported radius request type: <%s>", procVars[MetaRadReqType]) } } + if err := radReplyAppendAttributes(reply, procVars, reqProcessor.ReplyFields); err != nil { return false, err } diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index dccb16743..f01a4893a 100755 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -292,9 +292,10 @@ func main() { if err = tpReader.LoadAll(); err != nil { log.Fatal(err) } - if *verbose { + /*if *verbose { tpReader.ShowStatistics() } + */ if *dryRun { // We were just asked to parse the data, not saving it return } diff --git a/sessions/sessions.go b/sessions/sessions.go index 8624210c7..5e342d814 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -1856,6 +1856,27 @@ type V1ProcessEventReply struct { Attributes *engine.AttrSProcessEventReply } +// AsCGRReply is part of utils.CGRReplier interface +func (v1Rply *V1ProcessEventReply) AsCGRReply() (cgrReply utils.CGRReply, err error) { + cgrReply = make(map[string]interface{}) + if v1Rply.MaxUsage != nil { + cgrReply[utils.CapMaxUsage] = *v1Rply.MaxUsage + } + if v1Rply.ResourceAllocation != nil { + cgrReply[utils.CapResourceAllocation] = *v1Rply.ResourceAllocation + } + if v1Rply.Attributes != nil { + attrs := make(map[string]interface{}) + for _, fldName := range v1Rply.Attributes.AlteredFields { + if v1Rply.Attributes.CGREvent.HasField(fldName) { + attrs[fldName] = v1Rply.Attributes.CGREvent.Event[fldName] + } + } + cgrReply[utils.CapAttributes] = attrs + } + return +} + // Called on session end, should send the CDR to CDRS func (smg *SMGeneric) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, args *V1ProcessEventArgs, rply *V1ProcessEventReply) (err error) {