mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Diameter agent refactoring to use new SessionSv1 APIs as well as CGRReply in responses
This commit is contained in:
@@ -28,6 +28,7 @@ import (
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/sessions"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
"github.com/fiorix/go-diameter/diam"
|
||||
@@ -35,9 +36,10 @@ import (
|
||||
"github.com/fiorix/go-diameter/diam/sm"
|
||||
)
|
||||
|
||||
func NewDiameterAgent(cgrCfg *config.CGRConfig, smg rpcclient.RpcClientConnection,
|
||||
func NewDiameterAgent(cgrCfg *config.CGRConfig, sessionS rpcclient.RpcClientConnection,
|
||||
pubsubs rpcclient.RpcClientConnection) (*DiameterAgent, error) {
|
||||
da := &DiameterAgent{cgrCfg: cgrCfg, smg: smg, pubsubs: pubsubs, connMux: new(sync.Mutex)}
|
||||
da := &DiameterAgent{cgrCfg: cgrCfg, sessionS: sessionS,
|
||||
pubsubs: pubsubs, connMux: new(sync.Mutex)}
|
||||
if reflect.ValueOf(da.pubsubs).IsNil() {
|
||||
da.pubsubs = nil // Empty it so we can check it later
|
||||
}
|
||||
@@ -51,10 +53,10 @@ func NewDiameterAgent(cgrCfg *config.CGRConfig, smg rpcclient.RpcClientConnectio
|
||||
}
|
||||
|
||||
type DiameterAgent struct {
|
||||
cgrCfg *config.CGRConfig
|
||||
smg rpcclient.RpcClientConnection // Connection towards CGR-SMG component
|
||||
pubsubs rpcclient.RpcClientConnection // Connection towards CGR-PubSub component
|
||||
connMux *sync.Mutex // Protect connection for read/write
|
||||
cgrCfg *config.CGRConfig
|
||||
sessionS rpcclient.RpcClientConnection // Connection towards CGR-SMG component
|
||||
pubsubs rpcclient.RpcClientConnection // Connection towards CGR-PubSub component
|
||||
connMux *sync.Mutex // Protect connection for read/write
|
||||
}
|
||||
|
||||
// Creates the message handlers
|
||||
@@ -77,8 +79,8 @@ func (self *DiameterAgent) handlers() diam.Handler {
|
||||
return dSM
|
||||
}
|
||||
|
||||
func (self DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestProcessor,
|
||||
processorVars map[string]string, cca *CCA) (bool, error) {
|
||||
func (da DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestProcessor,
|
||||
procVars processorVars, cca *CCA) (processed bool, err error) {
|
||||
passesAllFilters := true
|
||||
for _, fldFilter := range reqProcessor.RequestFilter {
|
||||
if passes, _ := passesFieldFilter(ccr.diamMessage, fldFilter, nil); !passes {
|
||||
@@ -93,14 +95,14 @@ func (self DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestPro
|
||||
utils.Logger.Info(fmt.Sprintf("<DiameterAgent> CCR message: %s", ccr.diamMessage))
|
||||
}
|
||||
if !reqProcessor.AppendCCA {
|
||||
*cca = *NewBareCCAFromCCR(ccr, self.cgrCfg.DiameterAgentCfg().OriginHost, self.cgrCfg.DiameterAgentCfg().OriginRealm)
|
||||
*cca = *NewBareCCAFromCCR(ccr, da.cgrCfg.DiameterAgentCfg().OriginHost, da.cgrCfg.DiameterAgentCfg().OriginRealm)
|
||||
}
|
||||
smgEv, err := ccr.AsSMGenericEvent(reqProcessor.CCRFields)
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> Processing message: %+v AsSMGenericEvent, error: %s", ccr.diamMessage, err))
|
||||
*cca = *NewBareCCAFromCCR(ccr, self.cgrCfg.DiameterAgentCfg().OriginHost, self.cgrCfg.DiameterAgentCfg().OriginRealm)
|
||||
*cca = *NewBareCCAFromCCR(ccr, da.cgrCfg.DiameterAgentCfg().OriginHost, da.cgrCfg.DiameterAgentCfg().OriginRealm)
|
||||
if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, strconv.Itoa(DiameterRatingFailed),
|
||||
false, self.cgrCfg.DiameterAgentCfg().Timezone); err != nil {
|
||||
false, da.cgrCfg.DiameterAgentCfg().Timezone); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> Processing message: %+v messageSetAVPsWithPath, error: %s", cca.diamMessage, err.Error()))
|
||||
return false, err
|
||||
}
|
||||
@@ -109,95 +111,122 @@ func (self DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestPro
|
||||
if len(reqProcessor.Flags) != 0 {
|
||||
smgEv[utils.CGRFlags] = reqProcessor.Flags.String() // Populate CGRFlags automatically
|
||||
}
|
||||
if reqProcessor.PublishEvent && self.pubsubs != nil {
|
||||
if reqProcessor.PublishEvent && da.pubsubs != nil {
|
||||
evt, err := smgEv.AsMapStringString()
|
||||
if err != nil {
|
||||
*cca = *NewBareCCAFromCCR(ccr, self.cgrCfg.DiameterAgentCfg().OriginHost, self.cgrCfg.DiameterAgentCfg().OriginRealm)
|
||||
*cca = *NewBareCCAFromCCR(ccr, da.cgrCfg.DiameterAgentCfg().OriginHost, da.cgrCfg.DiameterAgentCfg().OriginRealm)
|
||||
if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, strconv.Itoa(DiameterRatingFailed),
|
||||
false, self.cgrCfg.DiameterAgentCfg().Timezone); err != nil {
|
||||
false, da.cgrCfg.DiameterAgentCfg().Timezone); err != nil {
|
||||
return false, err
|
||||
}
|
||||
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> Processing message: %+v failed converting SMGEvent to pubsub one, error: %s", ccr.diamMessage, err))
|
||||
return false, ErrDiameterRatingFailed
|
||||
}
|
||||
var reply string
|
||||
if err := self.pubsubs.Call("PubSubV1.Publish", engine.CgrEvent(evt), &reply); err != nil {
|
||||
*cca = *NewBareCCAFromCCR(ccr, self.cgrCfg.DiameterAgentCfg().OriginHost, self.cgrCfg.DiameterAgentCfg().OriginRealm)
|
||||
if err := da.pubsubs.Call("PubSubV1.Publish", engine.CgrEvent(evt), &reply); err != nil {
|
||||
*cca = *NewBareCCAFromCCR(ccr, da.cgrCfg.DiameterAgentCfg().OriginHost, da.cgrCfg.DiameterAgentCfg().OriginRealm)
|
||||
if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, strconv.Itoa(DiameterRatingFailed),
|
||||
false, self.cgrCfg.DiameterAgentCfg().Timezone); err != nil {
|
||||
false, da.cgrCfg.DiameterAgentCfg().Timezone); err != nil {
|
||||
return false, err
|
||||
}
|
||||
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> Processing message: %+v failed publishing event, error: %s", ccr.diamMessage, err))
|
||||
return false, ErrDiameterRatingFailed
|
||||
}
|
||||
}
|
||||
var maxUsage time.Duration
|
||||
processorVars[CGRResultCode] = strconv.Itoa(diam.Success)
|
||||
processorVars[CGRError] = ""
|
||||
procVars[CGRResultCode] = strconv.Itoa(diam.Success)
|
||||
if reqProcessor.DryRun { // DryRun does not send over network
|
||||
utils.Logger.Info(fmt.Sprintf("<DiameterAgent> SMGenericEvent: %+v", smgEv))
|
||||
processorVars[CGRResultCode] = strconv.Itoa(diam.LimitedSuccess)
|
||||
} else { // Find out maxUsage over APIs
|
||||
procVars[CGRResultCode] = strconv.Itoa(diam.LimitedSuccess)
|
||||
} else { // Query SessionS over APIs
|
||||
var tnt string
|
||||
if tntIf, has := smgEv[utils.Tenant]; has {
|
||||
if tntStr, canCast := utils.CastFieldIfToString(tntIf); canCast {
|
||||
tnt = tntStr
|
||||
}
|
||||
}
|
||||
cgrEv := &utils.CGREvent{
|
||||
Tenant: utils.FirstNonEmpty(tnt,
|
||||
config.CgrConfig().DefaultTenant),
|
||||
ID: "dmt:" + utils.UUIDSha1Prefix(),
|
||||
Time: utils.TimePointer(time.Now()),
|
||||
Event: smgEv,
|
||||
}
|
||||
switch ccr.CCRequestType {
|
||||
case 1:
|
||||
err = self.smg.Call("SMGenericV2.InitiateSession", smgEv, &maxUsage)
|
||||
var initReply sessions.V1InitSessionReply
|
||||
err = da.sessionS.Call(utils.SessionSv1InitiateSession,
|
||||
procVars.asV1InitSessionArgs(cgrEv), &initReply)
|
||||
if procVars[utils.MetaCGRReply], err = utils.NewCGRReply(&initReply, err); err != nil {
|
||||
return
|
||||
}
|
||||
case 2:
|
||||
err = self.smg.Call("SMGenericV2.UpdateSession", smgEv, &maxUsage)
|
||||
var updateReply sessions.V1UpdateSessionReply
|
||||
err = da.sessionS.Call(utils.SessionSv1UpdateSession,
|
||||
procVars.asV1UpdateSessionArgs(cgrEv), &updateReply)
|
||||
if procVars[utils.MetaCGRReply], err = utils.NewCGRReply(&updateReply, err); err != nil {
|
||||
return
|
||||
}
|
||||
case 3, 4: // Handle them together since we generate CDR for them
|
||||
var rpl string
|
||||
if ccr.CCRequestType == 3 {
|
||||
err = self.smg.Call("SMGenericV1.TerminateSession", smgEv, &rpl)
|
||||
if err = da.sessionS.Call(utils.SessionSv1TerminateSession,
|
||||
procVars.asV1TerminateSessionArgs(cgrEv), &rpl); err != nil {
|
||||
procVars[utils.MetaCGRReply] = &utils.CGRReply{utils.Error: err.Error()}
|
||||
}
|
||||
} else if ccr.CCRequestType == 4 {
|
||||
err = self.smg.Call("SMGenericV2.ChargeEvent", smgEv.Clone(), &maxUsage)
|
||||
if maxUsage == 0 {
|
||||
smgEv[utils.Usage] = 0 // For CDR not to debit
|
||||
var evntRply sessions.V1ProcessEventReply
|
||||
err = da.sessionS.Call(utils.SessionSv1ProcessEvent,
|
||||
procVars.asV1ProcessEventArgs(cgrEv), &evntRply)
|
||||
if procVars[utils.MetaCGRReply], err = utils.NewCGRReply(&evntRply, err); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if self.cgrCfg.DiameterAgentCfg().CreateCDR &&
|
||||
(!self.cgrCfg.DiameterAgentCfg().CDRRequiresSession || err == nil || !strings.HasSuffix(err.Error(), utils.ErrNoActiveSession.Error())) { // Check if CDR requires session
|
||||
if errCdr := self.smg.Call("SMGenericV1.ProcessCDR", smgEv, &rpl); errCdr != nil {
|
||||
if da.cgrCfg.DiameterAgentCfg().CreateCDR &&
|
||||
(!da.cgrCfg.DiameterAgentCfg().CDRRequiresSession || err == nil ||
|
||||
!strings.HasSuffix(err.Error(), utils.ErrNoActiveSession.Error())) { // Check if CDR requires session
|
||||
if errCdr := da.sessionS.Call(utils.SessionSv1ProcessCDR, *cgrEv, &rpl); errCdr != nil {
|
||||
err = errCdr
|
||||
procVars[utils.MetaCGRReply] = &utils.CGRReply{utils.Error: err.Error()}
|
||||
}
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
/*if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> Processing message: %+v, API error: %s", ccr.diamMessage, err))
|
||||
switch { // Prettify some errors
|
||||
case strings.HasSuffix(err.Error(), utils.ErrAccountNotFound.Error()):
|
||||
processorVars[CGRError] = utils.ErrAccountNotFound.Error()
|
||||
procVars[CGRError] = utils.ErrAccountNotFound.Error()
|
||||
case strings.HasSuffix(err.Error(), utils.ErrUserNotFound.Error()):
|
||||
processorVars[CGRError] = utils.ErrUserNotFound.Error()
|
||||
procVars[CGRError] = utils.ErrUserNotFound.Error()
|
||||
case strings.HasSuffix(err.Error(), utils.ErrInsufficientCredit.Error()):
|
||||
processorVars[CGRError] = utils.ErrInsufficientCredit.Error()
|
||||
procVars[CGRError] = utils.ErrInsufficientCredit.Error()
|
||||
case strings.HasSuffix(err.Error(), utils.ErrAccountDisabled.Error()):
|
||||
processorVars[CGRError] = utils.ErrAccountDisabled.Error()
|
||||
procVars[CGRError] = utils.ErrAccountDisabled.Error()
|
||||
case strings.HasSuffix(err.Error(), utils.ErrRatingPlanNotFound.Error()):
|
||||
processorVars[CGRError] = utils.ErrRatingPlanNotFound.Error()
|
||||
procVars[CGRError] = utils.ErrRatingPlanNotFound.Error()
|
||||
case strings.HasSuffix(err.Error(), utils.ErrUnauthorizedDestination.Error()):
|
||||
processorVars[CGRError] = utils.ErrUnauthorizedDestination.Error()
|
||||
procVars[CGRError] = utils.ErrUnauthorizedDestination.Error()
|
||||
default: // Unknown error
|
||||
processorVars[CGRError] = err.Error()
|
||||
processorVars[CGRResultCode] = strconv.Itoa(DiameterRatingFailed)
|
||||
procVars[CGRError] = err.Error()
|
||||
procVars[CGRResultCode] = strconv.Itoa(DiameterRatingFailed)
|
||||
}
|
||||
}
|
||||
if maxUsage < 0 {
|
||||
maxUsage = 0
|
||||
}
|
||||
if prevMaxUsageStr, hasKey := processorVars[CGRMaxUsage]; hasKey {
|
||||
*/
|
||||
/*if prevMaxUsageStr, hasKey := procVars[CGRMaxUsage]; hasKey {
|
||||
prevMaxUsage, _ := utils.ParseDurationWithNanosecs(prevMaxUsageStr)
|
||||
if prevMaxUsage < maxUsage {
|
||||
maxUsage = prevMaxUsage
|
||||
}
|
||||
}
|
||||
processorVars[CGRMaxUsage] = strconv.FormatInt(maxUsage.Nanoseconds(), 10)
|
||||
*/
|
||||
}
|
||||
if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, processorVars[CGRResultCode],
|
||||
false, self.cgrCfg.DiameterAgentCfg().Timezone); err != nil {
|
||||
diamCode, _ := procVars.valAsString(CGRResultCode)
|
||||
if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, diamCode,
|
||||
false, da.cgrCfg.DiameterAgentCfg().Timezone); err != nil {
|
||||
return false, err
|
||||
}
|
||||
if err := cca.SetProcessorAVPs(reqProcessor, processorVars); err != nil {
|
||||
if err := cca.SetProcessorAVPs(reqProcessor, procVars); err != nil {
|
||||
if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, strconv.Itoa(DiameterRatingFailed),
|
||||
false, self.cgrCfg.DiameterAgentCfg().Timezone); err != nil {
|
||||
false, da.cgrCfg.DiameterAgentCfg().Timezone); err != nil {
|
||||
return false, err
|
||||
}
|
||||
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> CCA SetProcessorAVPs for message: %+v, error: %s", ccr.diamMessage, err))
|
||||
@@ -217,9 +246,9 @@ func (self *DiameterAgent) handlerCCR(c diam.Conn, m *diam.Message) {
|
||||
}
|
||||
cca := NewBareCCAFromCCR(ccr, self.cgrCfg.DiameterAgentCfg().OriginHost, self.cgrCfg.DiameterAgentCfg().OriginRealm)
|
||||
var processed, lclProcessed bool
|
||||
processorVars := make(map[string]string) // Shared between processors
|
||||
procVars := make(processorVars) // Shared between processors
|
||||
for _, reqProcessor := range self.cgrCfg.DiameterAgentCfg().RequestProcessors {
|
||||
lclProcessed, err = self.processCCR(ccr, reqProcessor, processorVars, cca)
|
||||
lclProcessed, err = self.processCCR(ccr, reqProcessor, procVars, cca)
|
||||
if lclProcessed { // Process local so we don't overwrite globally
|
||||
processed = lclProcessed
|
||||
}
|
||||
|
||||
@@ -171,7 +171,7 @@ func TestDmtAgentPopulateCCTotalOctets(t *testing.T) {
|
||||
ccr.diamMessage = ccr.AsBareDiameterMessage()
|
||||
cca := NewBareCCAFromCCR(ccr, "cgr-da", "cgrates.org")
|
||||
if err := cca.SetProcessorAVPs(daRP,
|
||||
map[string]string{CGRError: "", CGRMaxUsage: "153600"}); err != nil {
|
||||
processorVars{CGRError: "", CGRMaxUsage: "153600"}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if avps, err := cca.diamMessage.FindAVPsWithPath([]interface{}{
|
||||
@@ -207,12 +207,14 @@ func TestDmtAgentResetStorDb(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
// Start CGR Engine
|
||||
func TestDmtAgentStartEngine(t *testing.T) {
|
||||
if _, err := engine.StopStartEngine(daCfgPath, 4000); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
// Connect rpc client to rater
|
||||
func TestDmtAgentApierRpcConn(t *testing.T) {
|
||||
|
||||
@@ -179,7 +179,7 @@ func avpValAsString(a *diam.AVP) string {
|
||||
}
|
||||
|
||||
// Handler for meta functions
|
||||
func metaHandler(m *diam.Message, processorVars map[string]string,
|
||||
func metaHandler(m *diam.Message, procVars processorVars,
|
||||
tag, arg string, dur time.Duration) (string, error) {
|
||||
switch tag {
|
||||
case META_CCR_USAGE:
|
||||
@@ -225,9 +225,9 @@ func metaHandler(m *diam.Message, processorVars map[string]string,
|
||||
|
||||
// metaValueExponent will multiply the float value with the exponent provided.
|
||||
// Expects 2 arguments in template separated by |
|
||||
func metaValueExponent(m *diam.Message, processorVars map[string]string,
|
||||
func metaValueExponent(m *diam.Message, procVars processorVars,
|
||||
argsTpl utils.RSRFields, roundingDecimals int) (string, error) {
|
||||
valStr := composedFieldvalue(m, argsTpl, 0, processorVars)
|
||||
valStr := composedFieldvalue(m, argsTpl, 0, procVars)
|
||||
handlerArgs := strings.Split(valStr, utils.HandlerArgSep)
|
||||
if len(handlerArgs) != 2 {
|
||||
return "", errors.New("Unexpected number of arguments")
|
||||
@@ -244,9 +244,9 @@ func metaValueExponent(m *diam.Message, processorVars map[string]string,
|
||||
return strconv.FormatFloat(utils.Round(res, roundingDecimals, utils.ROUNDING_MIDDLE), 'f', -1, 64), nil
|
||||
}
|
||||
|
||||
func metaSum(m *diam.Message, processorVars map[string]string,
|
||||
func metaSum(m *diam.Message, procVars processorVars,
|
||||
argsTpl utils.RSRFields, passAtIndex, roundingDecimals int) (string, error) {
|
||||
valStr := composedFieldvalue(m, argsTpl, passAtIndex, processorVars)
|
||||
valStr := composedFieldvalue(m, argsTpl, passAtIndex, procVars)
|
||||
handlerArgs := strings.Split(valStr, utils.HandlerArgSep)
|
||||
var summed float64
|
||||
for _, arg := range handlerArgs {
|
||||
@@ -275,12 +275,13 @@ func avpsWithPath(m *diam.Message, rsrFld *utils.RSRField) ([]*diam.AVP, error)
|
||||
splitIntoInterface(rsrFld.Id, utils.HIERARCHY_SEP), dict.UndefinedVendorID)
|
||||
}
|
||||
|
||||
func passesFieldFilter(m *diam.Message, fieldFilter *utils.RSRField, processorVars map[string]string) (bool, int) {
|
||||
func passesFieldFilter(m *diam.Message, fieldFilter *utils.RSRField, procVars processorVars) (bool, int) {
|
||||
if fieldFilter == nil {
|
||||
return true, 0
|
||||
}
|
||||
if val, hasIt := processorVars[fieldFilter.Id]; hasIt { // ProcessorVars have priority
|
||||
if fieldFilter.FilterPasses(val) {
|
||||
if val, hasIt := procVars[fieldFilter.Id]; hasIt { // ProcessorVars have priority
|
||||
valStr, _ := utils.CastFieldIfToString(val)
|
||||
if fieldFilter.FilterPasses(valStr) {
|
||||
return true, 0
|
||||
}
|
||||
return false, 0
|
||||
@@ -302,14 +303,15 @@ func passesFieldFilter(m *diam.Message, fieldFilter *utils.RSRField, processorVa
|
||||
return false, 0
|
||||
}
|
||||
|
||||
func composedFieldvalue(m *diam.Message, outTpl utils.RSRFields, avpIdx int, processorVars map[string]string) string {
|
||||
func composedFieldvalue(m *diam.Message, outTpl utils.RSRFields, avpIdx int, procVars processorVars) string {
|
||||
var outVal string
|
||||
for _, rsrTpl := range outTpl {
|
||||
if rsrTpl.IsStatic() {
|
||||
outVal += rsrTpl.ParseValue("")
|
||||
} else {
|
||||
if val, hasIt := processorVars[rsrTpl.Id]; hasIt { // ProcessorVars have priority
|
||||
outVal += rsrTpl.ParseValue(val)
|
||||
if val, hasIt := procVars[rsrTpl.Id]; hasIt { // ProcessorVars have priority
|
||||
valStr, _ := utils.CastFieldIfToString(val)
|
||||
outVal += rsrTpl.ParseValue(valStr)
|
||||
continue
|
||||
}
|
||||
matchingAvps, err := avpsWithPath(m, rsrTpl)
|
||||
@@ -380,13 +382,13 @@ func serializeAVPValueFromString(dictAVP *dict.AVP, valStr, timezone string) ([]
|
||||
}
|
||||
|
||||
func fieldOutVal(m *diam.Message, cfgFld *config.CfgCdrField,
|
||||
extraParam interface{}, processorVars map[string]string) (fmtValOut string, err error) {
|
||||
extraParam interface{}, procVars processorVars) (fmtValOut string, err error) {
|
||||
var outVal string
|
||||
passAtIndex := -1
|
||||
passedAllFilters := true
|
||||
for _, fldFilter := range cfgFld.FieldFilter {
|
||||
var pass bool
|
||||
if pass, passAtIndex = passesFieldFilter(m, fldFilter, processorVars); !pass {
|
||||
if pass, passAtIndex = passesFieldFilter(m, fldFilter, procVars); !pass {
|
||||
passedAllFilters = false
|
||||
break
|
||||
}
|
||||
@@ -406,19 +408,19 @@ func fieldOutVal(m *diam.Message, cfgFld *config.CfgCdrField,
|
||||
case utils.META_HANDLER:
|
||||
switch cfgFld.HandlerId {
|
||||
case META_VALUE_EXPONENT:
|
||||
outVal, err = metaValueExponent(m, processorVars, cfgFld.Value, 10) // FixMe: add here configured number of decimals
|
||||
outVal, err = metaValueExponent(m, procVars, cfgFld.Value, 10) // FixMe: add here configured number of decimals
|
||||
case META_SUM:
|
||||
outVal, err = metaSum(m, processorVars, cfgFld.Value, passAtIndex, 10)
|
||||
outVal, err = metaSum(m, procVars, cfgFld.Value, passAtIndex, 10)
|
||||
default:
|
||||
outVal, err = metaHandler(m, processorVars, cfgFld.HandlerId, cfgFld.Layout, extraParam.(time.Duration))
|
||||
outVal, err = metaHandler(m, procVars, cfgFld.HandlerId, cfgFld.Layout, extraParam.(time.Duration))
|
||||
if err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<Diameter> Ignoring processing of metafunction: %s, error: %s", cfgFld.HandlerId, err.Error()))
|
||||
}
|
||||
}
|
||||
case utils.META_COMPOSED:
|
||||
outVal = composedFieldvalue(m, cfgFld.Value, 0, processorVars)
|
||||
outVal = composedFieldvalue(m, cfgFld.Value, 0, procVars)
|
||||
case utils.MetaGrouped: // GroupedAVP
|
||||
outVal = composedFieldvalue(m, cfgFld.Value, passAtIndex, processorVars)
|
||||
outVal = composedFieldvalue(m, cfgFld.Value, passAtIndex, procVars)
|
||||
}
|
||||
if fmtValOut, err = utils.FmtFieldWidth(cfgFld.Tag, outVal, cfgFld.Width, cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<Diameter> Error when processing field template with tag: %s, error: %s", cfgFld.Tag, err.Error()))
|
||||
@@ -692,7 +694,7 @@ func (self *CCA) AsDiameterMessage() *diam.Message {
|
||||
}
|
||||
|
||||
// SetProcessorAVPs will add AVPs to self.diameterMessage based on template defined in processor.CCAFields
|
||||
func (self *CCA) SetProcessorAVPs(reqProcessor *config.DARequestProcessor, processorVars map[string]string) error {
|
||||
func (self *CCA) SetProcessorAVPs(reqProcessor *config.DARequestProcessor, processorVars processorVars) error {
|
||||
for _, cfgFld := range reqProcessor.CCAFields {
|
||||
fmtOut, err := fieldOutVal(self.ccrMessage, cfgFld, nil, processorVars)
|
||||
if err == ErrFilterNotPassing { // Field not in or filter not passing, try match in answer
|
||||
|
||||
@@ -197,7 +197,8 @@ func TestFieldOutVal(t *testing.T) {
|
||||
t.Error("Should have error")
|
||||
}
|
||||
eOut = "360"
|
||||
if fldOut, err := fieldOutVal(m, cfgFld, time.Duration(0), map[string]string{"CGRError": "INSUFFICIENT_CREDIT"}); err != nil {
|
||||
if fldOut, err := fieldOutVal(m, cfgFld, time.Duration(0),
|
||||
processorVars{"CGRError": "INSUFFICIENT_CREDIT"}); err != nil {
|
||||
t.Error(err)
|
||||
} else if fldOut != eOut {
|
||||
t.Errorf("Expecting:\n%s\nReceived:\n%s", eOut, fldOut)
|
||||
@@ -423,7 +424,7 @@ func TestCCASetProcessorAVPs(t *testing.T) {
|
||||
diam.NewAVP(450, avp.Mbit, 0, datatype.Enumerated(0)), // Subscription-Id-Type
|
||||
diam.NewAVP(444, avp.Mbit, 0, datatype.UTF8String("33708000003")), // Subscription-Id-Data
|
||||
}})
|
||||
if err := cca.SetProcessorAVPs(reqProcessor, map[string]string{}); err != nil {
|
||||
if err := cca.SetProcessorAVPs(reqProcessor, processorVars{}); err != nil {
|
||||
t.Error(err)
|
||||
} else if ccaMsg := cca.AsDiameterMessage(); !reflect.DeepEqual(eMessage, ccaMsg) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", eMessage, ccaMsg)
|
||||
|
||||
@@ -172,6 +172,26 @@ func (pv processorVars) asV1TerminateSessionArgs(cgrEv *utils.CGREvent) (args *s
|
||||
return
|
||||
}
|
||||
|
||||
func (pv processorVars) asV1ProcessEventArgs(cgrEv *utils.CGREvent) (args *sessions.V1ProcessEventArgs) {
|
||||
args = &sessions.V1ProcessEventArgs{ // defaults
|
||||
Debit: true,
|
||||
CGREvent: *cgrEv,
|
||||
}
|
||||
if !pv.hasSubsystems() {
|
||||
return
|
||||
}
|
||||
if !pv.hasVar(utils.MetaAccounts) {
|
||||
args.Debit = false
|
||||
}
|
||||
if pv.hasVar(utils.MetaResources) {
|
||||
args.AllocateResources = true
|
||||
}
|
||||
if pv.hasVar(utils.MetaAttributes) {
|
||||
args.GetAttributes = true
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// radAttrVendorFromPath returns AttributenName and VendorName from path
|
||||
// path should be the form attributeName or vendorName/attributeName
|
||||
func attrVendorFromPath(path string) (attrName, vendorName string) {
|
||||
|
||||
@@ -199,10 +199,9 @@ func (ra *RadiusAgent) processRequest(reqProcessor *config.RARequestProcessor,
|
||||
}
|
||||
if ra.cgrCfg.RadiusAgentCfg().CreateCDR {
|
||||
if errCdr := ra.sessionS.Call(utils.SessionSv1ProcessCDR, *cgrEv, &rpl); errCdr != nil {
|
||||
procVars[utils.MetaCGRReply] = &utils.CGRReply{utils.Error: err.Error()}
|
||||
err = errCdr
|
||||
procVars[utils.MetaCGRReply] = &utils.CGRReply{utils.Error: err.Error()}
|
||||
}
|
||||
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
@@ -211,6 +210,7 @@ func (ra *RadiusAgent) processRequest(reqProcessor *config.RARequestProcessor,
|
||||
err = fmt.Errorf("unsupported radius request type: <%s>", procVars[MetaRadReqType])
|
||||
}
|
||||
}
|
||||
|
||||
if err := radReplyAppendAttributes(reply, procVars, reqProcessor.ReplyFields); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
@@ -292,9 +292,10 @@ func main() {
|
||||
if err = tpReader.LoadAll(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if *verbose {
|
||||
/*if *verbose {
|
||||
tpReader.ShowStatistics()
|
||||
}
|
||||
*/
|
||||
if *dryRun { // We were just asked to parse the data, not saving it
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1856,6 +1856,27 @@ type V1ProcessEventReply struct {
|
||||
Attributes *engine.AttrSProcessEventReply
|
||||
}
|
||||
|
||||
// AsCGRReply is part of utils.CGRReplier interface
|
||||
func (v1Rply *V1ProcessEventReply) AsCGRReply() (cgrReply utils.CGRReply, err error) {
|
||||
cgrReply = make(map[string]interface{})
|
||||
if v1Rply.MaxUsage != nil {
|
||||
cgrReply[utils.CapMaxUsage] = *v1Rply.MaxUsage
|
||||
}
|
||||
if v1Rply.ResourceAllocation != nil {
|
||||
cgrReply[utils.CapResourceAllocation] = *v1Rply.ResourceAllocation
|
||||
}
|
||||
if v1Rply.Attributes != nil {
|
||||
attrs := make(map[string]interface{})
|
||||
for _, fldName := range v1Rply.Attributes.AlteredFields {
|
||||
if v1Rply.Attributes.CGREvent.HasField(fldName) {
|
||||
attrs[fldName] = v1Rply.Attributes.CGREvent.Event[fldName]
|
||||
}
|
||||
}
|
||||
cgrReply[utils.CapAttributes] = attrs
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Called on session end, should send the CDR to CDRS
|
||||
func (smg *SMGeneric) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection,
|
||||
args *V1ProcessEventArgs, rply *V1ProcessEventReply) (err error) {
|
||||
|
||||
Reference in New Issue
Block a user