Stripped diameter1 code

This commit is contained in:
DanB
2018-09-18 18:39:32 +02:00
parent 867841361a
commit b418cd5303
6 changed files with 26 additions and 2774 deletions

View File

@@ -21,14 +21,9 @@ package agents
import (
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
"github.com/fiorix/go-diameter/diam"
@@ -36,16 +31,20 @@ import (
"github.com/fiorix/go-diameter/diam/sm"
)
func NewDiameterAgent(cgrCfg *config.CGRConfig, sessionS rpcclient.RpcClientConnection,
pubsubs rpcclient.RpcClientConnection) (*DiameterAgent, error) {
da := &DiameterAgent{cgrCfg: cgrCfg, sessionS: sessionS,
pubsubs: pubsubs, connMux: new(sync.Mutex)}
if reflect.ValueOf(da.pubsubs).IsNil() {
da.pubsubs = nil // Empty it so we can check it later
func NewDiameterAgent(cgrCfg *config.CGRConfig,
sessionS, thdS rpcclient.RpcClientConnection) (*DiameterAgent, error) {
if sessionS != nil && reflect.ValueOf(sessionS).IsNil() {
sessionS = nil
}
if thdS != nil && reflect.ValueOf(thdS).IsNil() {
thdS = nil
}
da := &DiameterAgent{
cgrCfg: cgrCfg, sessionS: sessionS,
thdS: thdS, connMux: new(sync.Mutex)}
dictsDir := cgrCfg.DiameterAgentCfg().DictionariesDir
if len(dictsDir) != 0 {
if err := loadDictionaries(dictsDir, "DiameterAgent"); err != nil {
if err := loadDictionaries(dictsDir, utils.DiameterAgent); err != nil {
return nil, err
}
}
@@ -54,11 +53,16 @@ func NewDiameterAgent(cgrCfg *config.CGRConfig, sessionS rpcclient.RpcClientConn
type DiameterAgent struct {
cgrCfg *config.CGRConfig
sessionS rpcclient.RpcClientConnection // Connection towards CGR-SMG component
pubsubs rpcclient.RpcClientConnection // Connection towards CGR-PubSub component
sessionS rpcclient.RpcClientConnection // Connection towards CGR-SessionS component
thdS rpcclient.RpcClientConnection // Connection towards CGR-ThresholdS component
connMux *sync.Mutex // Protect connection for read/write
}
// ListenAndServe is called when DiameterAgent is started, usually from within cmd/cgr-engine
func (self *DiameterAgent) ListenAndServe() error {
return diam.ListenAndServe(self.cgrCfg.DiameterAgentCfg().Listen, self.handlers(), nil)
}
// Creates the message handlers
func (self *DiameterAgent) handlers() diam.Handler {
settings := &sm.Settings{
@@ -69,201 +73,15 @@ func (self *DiameterAgent) handlers() diam.Handler {
FirmwareRevision: datatype.Unsigned32(utils.DIAMETER_FIRMWARE_REVISION),
}
dSM := sm.New(settings)
dSM.HandleFunc("CCR", self.handleCCR)
dSM.HandleFunc("ALL", self.handleALL)
dSM.HandleFunc("ALL", self.handleALL) // route all commands to one dispatcher
go func() {
for err := range dSM.ErrorReports() {
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> StateMachine error: %+v", err))
utils.Logger.Err(fmt.Sprintf("<%s> sm error: %v", utils.DiameterAgent, err))
}
}()
return dSM
}
func (da DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestProcessor,
procVars processorVars, cca *CCA) (processed bool, err error) {
passesAllFilters := true
for _, fldFilter := range reqProcessor.RequestFilter {
if passes, _ := passesFieldFilter(ccr.diamMessage, fldFilter, nil); !passes {
passesAllFilters = false
}
}
if !passesAllFilters { // Not going with this processor further
return false, nil
}
if reqProcessor.DryRun { // DryRun should log the matching processor as well as the received CCR
utils.Logger.Info(fmt.Sprintf("<DiameterAgent> RequestProcessor: %s", reqProcessor.Id))
utils.Logger.Info(fmt.Sprintf("<DiameterAgent> CCR message: %s", ccr.diamMessage))
}
if !reqProcessor.AppendCCA {
*cca = *NewBareCCAFromCCR(ccr, da.cgrCfg.DiameterAgentCfg().OriginHost, da.cgrCfg.DiameterAgentCfg().OriginRealm)
procVars = make(processorVars)
}
smgEv, err := ccr.AsMapIface(reqProcessor.CCRFields)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> Processing message: %+v AsSMGenericEvent, error: %s", ccr.diamMessage, err))
*cca = *NewBareCCAFromCCR(ccr, da.cgrCfg.DiameterAgentCfg().OriginHost, da.cgrCfg.DiameterAgentCfg().OriginRealm)
if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, strconv.Itoa(DiameterRatingFailed),
false, da.cgrCfg.DiameterAgentCfg().Timezone); err != nil {
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> Processing message: %+v messageSetAVPsWithPath, error: %s", cca.diamMessage, err.Error()))
return false, err
}
return false, ErrDiameterRatingFailed
}
if len(reqProcessor.Flags) != 0 {
smgEv[utils.CGRFlags] = reqProcessor.Flags.String() // Populate CGRFlags automatically
for flag, val := range reqProcessor.Flags {
procVars[flag] = val
}
}
if reqProcessor.PublishEvent && da.pubsubs != nil {
evt, err := engine.NewMapEvent(smgEv).AsMapString(nil)
if err != nil {
*cca = *NewBareCCAFromCCR(ccr, da.cgrCfg.DiameterAgentCfg().OriginHost, da.cgrCfg.DiameterAgentCfg().OriginRealm)
if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, strconv.Itoa(DiameterRatingFailed),
false, da.cgrCfg.DiameterAgentCfg().Timezone); err != nil {
return false, err
}
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> Processing message: %+v failed converting SMGEvent to pubsub one, error: %s", ccr.diamMessage, err))
return false, ErrDiameterRatingFailed
}
var reply string
if err := da.pubsubs.Call("PubSubV1.Publish", engine.CgrEvent(evt), &reply); err != nil {
*cca = *NewBareCCAFromCCR(ccr, da.cgrCfg.DiameterAgentCfg().OriginHost, da.cgrCfg.DiameterAgentCfg().OriginRealm)
if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, strconv.Itoa(DiameterRatingFailed),
false, da.cgrCfg.DiameterAgentCfg().Timezone); err != nil {
return false, err
}
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> Processing message: %+v failed publishing event, error: %s", ccr.diamMessage, err))
return false, ErrDiameterRatingFailed
}
}
if reqProcessor.DryRun { // DryRun does not send over network
utils.Logger.Info(fmt.Sprintf("<DiameterAgent> SMGenericEvent: %+v", smgEv))
procVars[CGRResultCode] = strconv.Itoa(diam.LimitedSuccess)
} else { // Query SessionS over APIs
var tnt string
if tntIf, has := smgEv[utils.Tenant]; has {
if tntStr, canCast := utils.CastFieldIfToString(tntIf); canCast {
tnt = tntStr
}
}
cgrEv := &utils.CGREvent{
Tenant: utils.FirstNonEmpty(tnt,
config.CgrConfig().DefaultTenant),
ID: "dmt:" + utils.UUIDSha1Prefix(),
Time: utils.TimePointer(time.Now()),
Event: smgEv,
}
switch ccr.CCRequestType {
case 1:
var initReply sessions.V1InitSessionReply
err = da.sessionS.Call(utils.SessionSv1InitiateSession,
procVars.asV1InitSessionArgs(cgrEv), &initReply)
if procVars[utils.MetaCGRReply], err = NewCGRReply(&initReply, err); err != nil {
return
}
case 2:
var updateReply sessions.V1UpdateSessionReply
err = da.sessionS.Call(utils.SessionSv1UpdateSession,
procVars.asV1UpdateSessionArgs(cgrEv), &updateReply)
if procVars[utils.MetaCGRReply], err = NewCGRReply(&updateReply, err); err != nil {
return
}
case 3, 4: // Handle them together since we generate CDR for them
var rpl string
if ccr.CCRequestType == 3 {
if err = da.sessionS.Call(utils.SessionSv1TerminateSession,
procVars.asV1TerminateSessionArgs(cgrEv), &rpl); err != nil {
procVars[utils.MetaCGRReply], _ = NewCGRReply(nil, err)
}
} else if ccr.CCRequestType == 4 {
var evntRply sessions.V1ProcessEventReply
err = da.sessionS.Call(utils.SessionSv1ProcessEvent,
procVars.asV1ProcessEventArgs(cgrEv), &evntRply)
if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
cgrEv.Event[utils.Usage] = 0 // avoid further debits
} else if evntRply.MaxUsage != nil {
cgrEv.Event[utils.Usage] = *evntRply.MaxUsage // make sure the CDR reflects the debit
}
if procVars[utils.MetaCGRReply], err = NewCGRReply(&evntRply, err); err != nil {
return
}
}
if da.cgrCfg.DiameterAgentCfg().CreateCDR &&
(!da.cgrCfg.DiameterAgentCfg().CDRRequiresSession || err == nil ||
!strings.HasSuffix(err.Error(), utils.ErrNoActiveSession.Error())) { // Check if CDR requires session
if errCdr := da.sessionS.Call(utils.SessionSv1ProcessCDR, cgrEv, &rpl); errCdr != nil {
err = errCdr
procVars[utils.MetaCGRReply], _ = NewCGRReply(nil, err)
}
}
}
}
diamCode := strconv.Itoa(diam.Success)
if procVars.hasVar(CGRResultCode) {
diamCode, _ = procVars.valAsString(CGRResultCode)
}
if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, diamCode,
false, da.cgrCfg.DiameterAgentCfg().Timezone); err != nil {
return false, err
}
if err := cca.SetProcessorAVPs(reqProcessor, procVars); err != nil {
if err := messageSetAVPsWithPath(cca.diamMessage, []interface{}{"Result-Code"}, strconv.Itoa(DiameterRatingFailed),
false, da.cgrCfg.DiameterAgentCfg().Timezone); err != nil {
return false, err
}
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> CCA SetProcessorAVPs for message: %+v, error: %s", ccr.diamMessage, err))
return false, ErrDiameterRatingFailed
}
if reqProcessor.DryRun {
utils.Logger.Info(fmt.Sprintf("<DiameterAgent> CCA message: %s", cca.diamMessage))
}
return true, nil
}
func (self *DiameterAgent) handlerCCR(c diam.Conn, m *diam.Message) {
ccr, err := NewCCRFromDiameterMessage(m, self.cgrCfg.DiameterAgentCfg().DebitInterval)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> Unmarshaling message: %s, error: %s", m, err))
return
}
cca := NewBareCCAFromCCR(ccr, self.cgrCfg.DiameterAgentCfg().OriginHost, self.cgrCfg.DiameterAgentCfg().OriginRealm)
var processed, lclProcessed bool
procVars := make(processorVars) // Shared between processors
for _, reqProcessor := range self.cgrCfg.DiameterAgentCfg().RequestProcessors {
lclProcessed, err = self.processCCR(ccr, reqProcessor, procVars, cca)
if lclProcessed { // Process local so we don't overwrite globally
processed = lclProcessed
}
if err != nil || (lclProcessed && !reqProcessor.ContinueOnSuccess) {
break
}
}
if err != nil && err != ErrDiameterRatingFailed {
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> CCA SetProcessorAVPs for message: %+v, error: %s", ccr.diamMessage, err))
return
} else if !processed {
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> No request processor enabled for CCR: %s, ignoring request", ccr.diamMessage))
return
}
self.connMux.Lock()
defer self.connMux.Unlock()
if _, err := cca.AsDiameterMessage().WriteTo(c); err != nil {
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> Failed to write message to %s: %s\n%s\n", c.RemoteAddr(), err, cca.AsDiameterMessage()))
return
}
}
// Simply dispatch the handling in goroutines
// Could be futher improved with rate control
func (self *DiameterAgent) handleCCR(c diam.Conn, m *diam.Message) {
go self.handlerCCR(c, m)
}
func (self *DiameterAgent) handleALL(c diam.Conn, m *diam.Message) {
utils.Logger.Warning(fmt.Sprintf("<DiameterAgent> Received unexpected message from %s:\n%s", c.RemoteAddr(), m))
}
func (self *DiameterAgent) ListenAndServe() error {
return diam.ListenAndServe(self.cgrCfg.DiameterAgentCfg().Listen, self.handlers(), nil)
utils.Logger.Warning(fmt.Sprintf("<%s> received unexpected message from %s:\n%s", utils.DiameterAgent, c.RemoteAddr(), m))
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,120 +0,0 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
/*
import (
"os/exec"
"path"
"testing"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
)
var cgrRater1Cmd, cgrSmg1Cmd *exec.Cmd
func TestHaPoolInitCfg(t *testing.T) {
daCfgPath = path.Join(*dataDir, "conf", "samples", "hapool", "cgrrater1")
// Init config first
var err error
daCfg, err = config.NewCGRConfigFromFolder(daCfgPath)
if err != nil {
t.Error(err)
}
daCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush()
config.SetCgrConfig(daCfg)
}
// Remove data in both rating and accounting db
func TestHaPoolResetDataDb(t *testing.T) {
TestDmtAgentResetDataDb(t)
}
// Wipe out the cdr database
func TestHaPoolResetStorDb(t *testing.T) {
TestDmtAgentResetStorDb(t)
}
// Start CGR Engine
func TestHaPoolStartEngine(t *testing.T) {
engine.KillEngine(*waitRater) // just to make sure
var err error
cgrRater1 := path.Join(*dataDir, "conf", "samples", "hapool", "cgrrater1")
if cgrRater1Cmd, err = engine.StartEngine(cgrRater1, *waitRater); err != nil {
t.Fatal("cgrRater1: ", err)
}
cgrRater2 := path.Join(*dataDir, "conf", "samples", "hapool", "cgrrater2")
if _, err = engine.StartEngine(cgrRater2, *waitRater); err != nil {
t.Fatal("cgrRater2: ", err)
}
cgrSmg1 := path.Join(*dataDir, "conf", "samples", "hapool", "cgrsmg1")
if cgrSmg1Cmd, err = engine.StartEngine(cgrSmg1, *waitRater); err != nil {
t.Fatal("cgrSmg1: ", err)
}
cgrSmg2 := path.Join(*dataDir, "conf", "samples", "hapool", "cgrsmg2")
if _, err = engine.StartEngine(cgrSmg2, *waitRater); err != nil {
t.Fatal("cgrSmg2: ", err)
}
cgrDa := path.Join(*dataDir, "conf", "samples", "hapool", "dagent")
if _, err = engine.StartEngine(cgrDa, *waitRater); err != nil {
t.Fatal("cgrDa: ", err)
}
}
// Connect rpc client to rater
func TestHaPoolApierRpcConn(t *testing.T) {
TestDmtAgentApierRpcConn(t)
}
// Load the tariff plan, creating accounts and their balances
func TestHaPoolTPFromFolder(t *testing.T) {
TestDmtAgentTPFromFolder(t)
}
// cgr-console 'cost Category="call" Tenant="cgrates.org" Subject="1001" Destination="1004" TimeStart="2015-11-07T08:42:26Z" TimeEnd="2015-11-07T08:47:26Z"'
func TestHaPoolSendCCRInit(t *testing.T) {
TestDmtAgentSendCCRInit(t)
}
// cgr-console 'cost Category="call" Tenant="cgrates.org" Subject="1001" Destination="1004" TimeStart="2015-11-07T08:42:26Z" TimeEnd="2015-11-07T08:52:26Z"'
func TestHaPoolSendCCRUpdate(t *testing.T) {
TestDmtAgentSendCCRUpdate(t)
}
// cgr-console 'cost Category="call" Tenant="cgrates.org" Subject="1001" Destination="1004" TimeStart="2015-11-07T08:42:26Z" TimeEnd="2015-11-07T08:57:26Z"'
func TestHaPoolSendCCRUpdate2(t *testing.T) {
TestDmtAgentSendCCRUpdate2(t)
}
func TestHaPoolSendCCRTerminate(t *testing.T) {
TestDmtAgentSendCCRTerminate(t)
}
func TestHaPoolCdrs(t *testing.T) {
TestDmtAgentCdrs(t)
}
func TestHaPoolStopEngine(t *testing.T) {
TestDmtAgentStopEngine(t)
}
*/

View File

@@ -21,6 +21,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package agents
import (
"flag"
"fmt"
"io/ioutil"
"net/http"
@@ -41,6 +42,9 @@ var (
haCfg *config.CGRConfig
haRPC *rpc.Client
httpC *http.Client // so we can cache the connection
err error
dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache")
)
func TestHAitInitCfg(t *testing.T) {

View File

@@ -18,204 +18,21 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package agents
/*
Build various type of packets here
*/
import (
"errors"
"fmt"
"math"
"math/rand"
"net"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
"github.com/fiorix/go-diameter/diam"
"github.com/fiorix/go-diameter/diam/avp"
"github.com/fiorix/go-diameter/diam/datatype"
"github.com/fiorix/go-diameter/diam/dict"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
const (
META_CCR_USAGE = "*ccr_usage"
META_VALUE_EXPONENT = "*value_exponent"
META_SUM = "*sum"
DIAMETER_CCR = "DIAMETER_CCR"
DiameterRatingFailed = 5031
CGRError = "CGRError"
CGRMaxUsage = "CGRMaxUsage"
CGRResultCode = "CGRResultCode"
)
var (
ErrFilterNotPassing = errors.New("Filter not passing")
ErrDiameterRatingFailed = errors.New("Diameter rating failed")
)
// processorVars will hold various variables using during request processing
// here so we can define methods on it
type processorVars map[string]interface{}
// hasSubsystems will return true on single subsystem being present in processorVars
func (pv processorVars) hasSubsystems() (has bool) {
for _, k := range []string{utils.MetaAccounts, utils.MetaResources,
utils.MetaSuppliers, utils.MetaAttributes} {
if _, has = pv[k]; has {
return
}
}
return
}
func (pv processorVars) hasVar(k string) (has bool) {
_, has = pv[k]
return
}
// valAsInterface returns the string value for fldName
func (pv processorVars) valAsInterface(fldPath string) (val interface{}, err error) {
fldName := fldPath
if strings.HasPrefix(fldPath, utils.MetaCGRReply) {
fldName = utils.MetaCGRReply
}
if !pv.hasVar(fldName) {
err = errors.New("not found")
return
}
return config.NewNavigableMap(pv).FieldAsInterface(strings.Split(fldPath, utils.HIERARCHY_SEP))
}
// valAsString returns the string value for fldName
// returns empty if fldName not found
func (pv processorVars) valAsString(fldPath string) (val string, err error) {
fldName := fldPath
if strings.HasPrefix(fldPath, utils.MetaCGRReply) {
fldName = utils.MetaCGRReply
}
if !pv.hasVar(fldName) {
return "", utils.ErrNotFoundNoCaps
}
return config.NewNavigableMap(pv).FieldAsString(strings.Split(fldPath, utils.HIERARCHY_SEP))
}
// asV1AuthorizeArgs returns the arguments needed by SessionSv1.AuthorizeEvent
func (pv processorVars) asV1AuthorizeArgs(cgrEv *utils.CGREvent) (args *sessions.V1AuthorizeArgs) {
args = &sessions.V1AuthorizeArgs{ // defaults
GetMaxUsage: true,
CGREvent: *cgrEv,
}
if !pv.hasSubsystems() {
return
}
if !pv.hasVar(utils.MetaAccounts) {
args.GetMaxUsage = false
}
if pv.hasVar(utils.MetaResources) {
args.AuthorizeResources = true
}
if pv.hasVar(utils.MetaSuppliers) {
args.GetSuppliers = true
}
if pv.hasVar(utils.MetaAttributes) {
args.GetAttributes = true
}
return
}
// asV1InitSessionArgs returns the arguments used in SessionSv1.InitSession
func (pv processorVars) asV1InitSessionArgs(cgrEv *utils.CGREvent) (args *sessions.V1InitSessionArgs) {
args = &sessions.V1InitSessionArgs{ // defaults
InitSession: true,
CGREvent: *cgrEv,
}
if !pv.hasSubsystems() {
return
}
if !pv.hasVar(utils.MetaAccounts) {
args.InitSession = false
}
if pv.hasVar(utils.MetaResources) {
args.AllocateResources = true
}
if pv.hasVar(utils.MetaAttributes) {
args.GetAttributes = true
}
return
}
// asV1UpdateSessionArgs returns the arguments used in SessionSv1.InitSession
func (pv processorVars) asV1UpdateSessionArgs(cgrEv *utils.CGREvent) (args *sessions.V1UpdateSessionArgs) {
args = &sessions.V1UpdateSessionArgs{ // defaults
UpdateSession: true,
CGREvent: *cgrEv,
}
if !pv.hasSubsystems() {
return
}
if !pv.hasVar(utils.MetaAccounts) {
args.UpdateSession = false
}
if pv.hasVar(utils.MetaAttributes) {
args.GetAttributes = true
}
return
}
// asV1TerminateSessionArgs returns the arguments used in SMGv1.TerminateSession
func (pv processorVars) asV1TerminateSessionArgs(cgrEv *utils.CGREvent) (args *sessions.V1TerminateSessionArgs) {
args = &sessions.V1TerminateSessionArgs{ // defaults
TerminateSession: true,
CGREvent: *cgrEv,
}
if !pv.hasSubsystems() {
return
}
if !pv.hasVar(utils.MetaAccounts) {
args.TerminateSession = false
}
if pv.hasVar(utils.MetaResources) {
args.ReleaseResources = true
}
return
}
func (pv processorVars) asV1ProcessEventArgs(cgrEv *utils.CGREvent) (args *sessions.V1ProcessEventArgs) {
args = &sessions.V1ProcessEventArgs{ // defaults
Debit: true,
CGREvent: *cgrEv,
}
if !pv.hasSubsystems() {
return
}
if !pv.hasVar(utils.MetaAccounts) {
args.Debit = false
}
if pv.hasVar(utils.MetaResources) {
args.AllocateResources = true
}
if pv.hasVar(utils.MetaAttributes) {
args.GetAttributes = true
}
return
}
func loadDictionaries(dictsDir, componentId string) error {
fi, err := os.Stat(dictsDir)
if err != nil {
if strings.HasSuffix(err.Error(), "no such file or directory") {
return fmt.Errorf("<DiameterAgent> Invalid dictionaries folder: <%s>", dictsDir)
return fmt.Errorf("<%s> Invalid dictionaries folder: <%s>", componentId, dictsDir)
}
return err
} else if !fi.IsDir() { // If config dir defined, needs to exist
@@ -240,651 +57,4 @@ func loadDictionaries(dictsDir, componentId string) error {
}
return nil
})
}
// Returns reqType, requestNr and ccTime in seconds
func disectUsageForCCR(usage time.Duration, debitInterval time.Duration, callEnded bool) (reqType, reqNr, reqCCTime, usedCCTime int) {
usageSecs := usage.Seconds()
debitIntervalSecs := debitInterval.Seconds()
reqType = 1
if usage > 0 {
reqType = 2
}
if callEnded {
reqType = 3
}
reqNr = int(usageSecs / debitIntervalSecs)
if callEnded {
reqNr += 1
}
ccTimeFloat := debitInterval.Seconds()
if callEnded {
ccTimeFloat = math.Mod(usageSecs, debitIntervalSecs)
}
if reqType == 1 { // Initial does not have usedCCTime
reqCCTime = int(ccTimeFloat)
} else if reqType == 2 {
reqCCTime = int(ccTimeFloat)
usedCCTime = int(math.Mod(usageSecs, debitIntervalSecs))
} else if reqType == 3 {
usedCCTime = int(ccTimeFloat) // Termination does not have requestCCTime
}
return
}
func usageFromCCR(reqType int, reqNr, usedCCTime int64, debitIterval time.Duration) (usage time.Duration) {
//dISecs := debitIterval.Nano()
//var ccTime int
usage = debitIterval
if reqType == 3 {
reqNr -= 1 // decrease request number to reach the real number
usage = (time.Duration(usedCCTime) * time.Second) + time.Duration(debitIterval.Nanoseconds()*reqNr)
}
return
}
// Utility function to convert from StoredCdr to CCR struct
func storedCdrToCCR(cdr *engine.CDR, originHost, originRealm string, vendorId int, productName string,
firmwareRev int, debitInterval time.Duration, callEnded bool) *CCR {
//sid := "session;" + strconv.Itoa(int(rand.Uint32()))
reqType, reqNr, reqCCTime, usedCCTime := disectUsageForCCR(cdr.Usage, debitInterval, callEnded)
ccr := &CCR{SessionId: cdr.CGRID, OriginHost: originHost, OriginRealm: originRealm, DestinationHost: originHost, DestinationRealm: originRealm,
AuthApplicationId: 4, ServiceContextId: cdr.ExtraFields["Service-Context-Id"], CCRequestType: reqType, CCRequestNumber: reqNr, EventTimestamp: cdr.AnswerTime,
ServiceIdentifier: 0}
ccr.SubscriptionId = make([]struct {
SubscriptionIdType int `avp:"Subscription-Id-Type"`
SubscriptionIdData string `avp:"Subscription-Id-Data"`
}, 1)
ccr.SubscriptionId[0].SubscriptionIdType = 0
ccr.SubscriptionId[0].SubscriptionIdData = cdr.Account
ccr.RequestedServiceUnit.CCTime = reqCCTime
ccr.UsedServiceUnit.CCTime = usedCCTime
ccr.ServiceInformation.INInformation.CallingPartyAddress = cdr.Account
ccr.ServiceInformation.INInformation.CalledPartyAddress = cdr.Destination
ccr.ServiceInformation.INInformation.RealCalledNumber = cdr.Destination
ccr.ServiceInformation.INInformation.ChargeFlowType = 0
ccr.ServiceInformation.INInformation.CallingVlrNumber = cdr.ExtraFields["Calling-Vlr-Number"]
ccr.ServiceInformation.INInformation.CallingCellIDOrSAI = cdr.ExtraFields["Calling-CellID-Or-SAI"]
ccr.ServiceInformation.INInformation.BearerCapability = cdr.ExtraFields["Bearer-Capability"]
ccr.ServiceInformation.INInformation.CallReferenceNumber = cdr.CGRID
ccr.ServiceInformation.INInformation.TimeZone = 0
ccr.ServiceInformation.INInformation.SSPTime = cdr.ExtraFields["SSP-Time"]
return ccr
}
// Not the cleanest but most efficient way to retrieve a string from AVP since there are string methods on all datatypes
// and the output is always in teh form "DataType{real_string}Padding:x"
func avpValAsString(a *diam.AVP) string {
dataVal := a.Data.String()
startIdx := strings.Index(dataVal, "{")
endIdx := strings.Index(dataVal, "}")
if startIdx == 0 || endIdx == 0 {
return ""
}
return dataVal[startIdx+1 : endIdx]
}
// Handler for meta functions
func metaHandler(m *diam.Message, procVars processorVars,
tag, arg string, dur time.Duration) (string, error) {
switch tag {
case META_CCR_USAGE:
var ok bool
var reqType datatype.Enumerated
var reqNr, usedUnit datatype.Unsigned32
if ccReqTypeAvp, err := m.FindAVP("CC-Request-Type", 0); err != nil {
return "", err
} else if ccReqTypeAvp == nil {
return "", errors.New("CC-Request-Type not found")
} else if reqType, ok = ccReqTypeAvp.Data.(datatype.Enumerated); !ok {
return "", fmt.Errorf("CC-Request-Type must be Enumerated and not %v", ccReqTypeAvp.Data.Type())
}
if ccReqNrAvp, err := m.FindAVP("CC-Request-Number", 0); err != nil {
return "", err
} else if ccReqNrAvp == nil {
return "", errors.New("CC-Request-Number not found")
} else if reqNr, ok = ccReqNrAvp.Data.(datatype.Unsigned32); !ok {
return "", fmt.Errorf("CC-Request-Number must be Unsigned32 and not %v", ccReqNrAvp.Data.Type())
}
switch reqType {
case datatype.Enumerated(1), datatype.Enumerated(2):
if reqUnitAVPs, err := m.FindAVPsWithPath([]interface{}{"Requested-Service-Unit", "CC-Time"}, dict.UndefinedVendorID); err != nil {
return "", err
} else if len(reqUnitAVPs) == 0 {
return "", errors.New("Requested-Service-Unit>CC-Time not found")
} else if usedUnit, ok = reqUnitAVPs[0].Data.(datatype.Unsigned32); !ok {
return "", fmt.Errorf("Requested-Service-Unit>CC-Time must be Unsigned32 and not %v", reqUnitAVPs[0].Data.Type())
}
case datatype.Enumerated(3), datatype.Enumerated(4):
if usedUnitAVPs, err := m.FindAVPsWithPath([]interface{}{"Used-Service-Unit", "CC-Time"}, dict.UndefinedVendorID); err != nil {
return "", err
} else if len(usedUnitAVPs) != 0 {
if usedUnit, ok = usedUnitAVPs[0].Data.(datatype.Unsigned32); !ok {
return "", fmt.Errorf("Used-Service-Unit>CC-Time must be Unsigned32 and not %v", usedUnitAVPs[0].Data.Type())
}
}
}
return usageFromCCR(int(reqType), int64(reqNr), int64(usedUnit), dur).String(), nil
}
return "", nil
}
// metaValueExponent will multiply the float value with the exponent provided.
// Expects 2 arguments in template separated by |
func metaValueExponent(m *diam.Message, procVars processorVars,
argsTpl utils.RSRFields, roundingDecimals int) (string, error) {
valStr := composedFieldvalue(m, argsTpl, 0, procVars)
handlerArgs := strings.Split(valStr, utils.HandlerArgSep)
if len(handlerArgs) != 2 {
return "", errors.New("Unexpected number of arguments")
}
val, err := strconv.ParseFloat(handlerArgs[0], 64)
if err != nil {
return "", err
}
exp, err := strconv.Atoi(handlerArgs[1])
if err != nil {
return "", err
}
res := val * math.Pow10(exp)
return strconv.FormatFloat(utils.Round(res, roundingDecimals, utils.ROUNDING_MIDDLE), 'f', -1, 64), nil
}
func metaSum(m *diam.Message, procVars processorVars,
argsTpl utils.RSRFields, passAtIndex, roundingDecimals int) (string, error) {
valStr := composedFieldvalue(m, argsTpl, passAtIndex, procVars)
handlerArgs := strings.Split(valStr, utils.HandlerArgSep)
var summed float64
for _, arg := range handlerArgs {
val, err := strconv.ParseFloat(arg, 64)
if err != nil {
return "", err
}
summed += val
}
return strconv.FormatFloat(utils.Round(summed, roundingDecimals, utils.ROUNDING_MIDDLE), 'f', -1, 64), nil
}
// splitIntoInterface is used to split a string into []interface{} instead of []string
func splitIntoInterface(content, sep string) []interface{} {
spltStr := strings.Split(content, sep)
spltIf := make([]interface{}, len(spltStr))
for i, val := range spltStr {
spltIf[i] = val
}
return spltIf
}
// avpsWithPath is used to find AVPs by specifying RSRField as filter
func avpsWithPath(m *diam.Message, rsrFld *utils.RSRField) ([]*diam.AVP, error) {
return m.FindAVPsWithPath(
splitIntoInterface(rsrFld.Id, utils.HIERARCHY_SEP), dict.UndefinedVendorID)
}
func passesFieldFilter(m *diam.Message, fieldFilter *utils.RSRField, procVars processorVars) (bool, int) {
if fieldFilter == nil {
return true, 0
}
// check procVars before AVPs
if val, err := procVars.valAsString(fieldFilter.Id); err != utils.ErrNotFoundNoCaps {
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<RSRFilter> parsing value: <%s> as string, error: <%s>",
fieldFilter.Id, err.Error()))
return false, 0
}
if _, err := fieldFilter.Parse(val); err != nil {
return false, 0
}
return true, 0
}
avps, err := avpsWithPath(m, fieldFilter)
if err != nil {
return false, 0
}
if len(avps) == 0 { // No AVP found in request, treat it same as empty
if _, err := fieldFilter.Parse(""); err != nil {
return false, 0
}
return true, -1
}
for avpIdx, avpVal := range avps { // First match wins due to index
if _, err = fieldFilter.Parse(avpValAsString(avpVal)); err == nil {
return true, avpIdx
}
}
return false, 0
}
func composedFieldvalue(m *diam.Message, outTpl utils.RSRFields, avpIdx int, procVars processorVars) (outVal string) {
var err error
for _, rsrTpl := range outTpl {
var valToParse string
if !rsrTpl.IsStatic() { // for Static we will parse empty valToParse bellow
// check procVars before AVPs
if valToParse, err = procVars.valAsString(rsrTpl.Id); err != nil {
if err != utils.ErrNotFoundNoCaps {
utils.Logger.Warning(
fmt.Sprintf("<%s> %s", utils.DiameterAgent, err.Error()))
continue
}
// not found in processorVars, look in AVPs
// AVPs from here
matchingAvps, err := avpsWithPath(m, rsrTpl)
if err != nil || len(matchingAvps) == 0 {
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error matching AVPS: %s",
utils.DiameterAgent, err.Error()))
}
continue
}
if len(matchingAvps) <= avpIdx {
utils.Logger.Warning(
fmt.Sprintf("<%s> Cannot retrieve AVP with index %d for field template with id: %s",
utils.DiameterAgent, avpIdx, rsrTpl.Id))
continue // Not convertible, ignore
}
if matchingAvps[0].Data.Type() == diam.GroupedAVPType {
utils.Logger.Warning(
fmt.Sprintf("<%s> Value for field template with id: %s is matching a group AVP, ignoring.",
utils.DiameterAgent, rsrTpl.Id))
continue // Not convertible, ignore
}
valToParse = avpValAsString(matchingAvps[avpIdx])
}
}
if parsed, err := rsrTpl.Parse(valToParse); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> %s",
utils.DiameterAgent, err.Error()))
} else {
outVal += parsed
}
}
return
}
// Used to return the encoded value based on what AVP understands for it's type
func serializeAVPValueFromString(dictAVP *dict.AVP, valStr, timezone string) ([]byte, error) {
switch dictAVP.Data.Type {
case datatype.OctetStringType, datatype.DiameterIdentityType, datatype.DiameterURIType, datatype.IPFilterRuleType, datatype.QoSFilterRuleType, datatype.UTF8StringType:
return []byte(valStr), nil
case datatype.AddressType:
return []byte(net.ParseIP(valStr)), nil
case datatype.EnumeratedType, datatype.Integer32Type, datatype.Unsigned32Type:
i, err := strconv.Atoi(valStr)
if err != nil {
return nil, err
}
return datatype.Enumerated(i).Serialize(), nil
case datatype.Unsigned64Type, datatype.Integer64Type:
i, err := strconv.ParseInt(valStr, 10, 64)
if err != nil {
return nil, err
}
return datatype.Unsigned64(i).Serialize(), nil
case datatype.Float32Type:
f, err := strconv.ParseFloat(valStr, 32)
if err != nil {
return nil, err
}
return datatype.Float32(f).Serialize(), nil
case datatype.Float64Type:
f, err := strconv.ParseFloat(valStr, 64)
if err != nil {
return nil, err
}
return datatype.Float64(f).Serialize(), nil
case datatype.GroupedType:
return nil, errors.New("GroupedType not supported for serialization")
case datatype.IPv4Type:
return datatype.IPv4(net.ParseIP(valStr)).Serialize(), nil
case datatype.TimeType:
t, err := utils.ParseTimeDetectLayout(valStr, timezone)
if err != nil {
return nil, err
}
return datatype.Time(t).Serialize(), nil
default:
return nil, fmt.Errorf("Unsupported type for serialization: %v", dictAVP.Data.Type)
}
}
func fieldOutVal(m *diam.Message, cfgFld *config.CfgCdrField,
extraParam interface{}, procVars processorVars) (fmtValOut string, err error) {
var outVal string
passAtIndex := -1
passedAllFilters := true
for _, fldFilter := range cfgFld.FieldFilter {
var pass bool
if pass, passAtIndex = passesFieldFilter(m, fldFilter, procVars); !pass {
passedAllFilters = false
break
}
}
if !passedAllFilters {
return "", ErrFilterNotPassing // Not matching field filters, will have it empty
}
if passAtIndex == -1 {
passAtIndex = 0 // No filter
}
switch cfgFld.Type {
case utils.META_FILLER:
outVal = cfgFld.Value.Id()
cfgFld.Padding = "right"
case utils.META_CONSTANT:
outVal = cfgFld.Value.Id()
case utils.META_HANDLER:
switch cfgFld.HandlerId {
case META_VALUE_EXPONENT:
outVal, err = metaValueExponent(m, procVars, cfgFld.Value, 10) // FixMe: add here configured number of decimals
case META_SUM:
outVal, err = metaSum(m, procVars, cfgFld.Value, passAtIndex, 10)
default:
outVal, err = metaHandler(m, procVars, cfgFld.HandlerId, cfgFld.Layout, extraParam.(time.Duration))
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<Diameter> Ignoring processing of metafunction: %s, error: %s", cfgFld.HandlerId, err.Error()))
}
}
case utils.META_COMPOSED:
outVal = composedFieldvalue(m, cfgFld.Value, 0, procVars)
case utils.MetaGrouped: // GroupedAVP
outVal = composedFieldvalue(m, cfgFld.Value, passAtIndex, procVars)
}
if fmtValOut, err = utils.FmtFieldWidth(cfgFld.Tag, outVal, cfgFld.Width,
cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil {
utils.Logger.Warning(fmt.Sprintf("<Diameter> Error when processing field template with tag: %s, error: %s", cfgFld.Tag, err.Error()))
return "", err
}
return fmtValOut, nil
}
// messageAddAVPsWithPath will dynamically add AVPs into the message
// append: append to the message, on false overwrite if AVP is single or add to group if AVP is Grouped
func messageSetAVPsWithPath(m *diam.Message, path []interface{},
avpValStr string, appnd bool, timezone string) error {
if len(path) == 0 {
return errors.New("Empty path as AVP filter")
}
dictAVPs := make([]*dict.AVP, len(path)) // for each subpath, one dictionary AVP
for i, subpath := range path {
if dictAVP, err := m.Dictionary().FindAVP(m.Header.ApplicationID, subpath); err != nil {
return err
} else if dictAVP == nil {
return fmt.Errorf("Cannot find AVP with id: %s", path[len(path)-1])
} else {
dictAVPs[i] = dictAVP
}
}
if dictAVPs[len(path)-1].Data.Type == diam.GroupedAVPType {
return errors.New("Last AVP in path cannot be GroupedAVP")
}
var msgAVP *diam.AVP // Keep a reference here towards last AVP
lastAVPIdx := len(path) - 1
for i := lastAVPIdx; i >= 0; i-- {
var typeVal datatype.Type
if i == lastAVPIdx {
avpValByte, err := serializeAVPValueFromString(dictAVPs[i], avpValStr, timezone)
if err != nil {
return err
}
typeVal, err = datatype.Decode(dictAVPs[i].Data.Type, avpValByte) // Check here
if err != nil {
return err
}
} else {
typeVal = &diam.GroupedAVP{
AVP: []*diam.AVP{msgAVP}}
}
newMsgAVP := diam.NewAVP(dictAVPs[i].Code, avp.Mbit, dictAVPs[i].VendorID, typeVal) // FixMe: maybe Mbit with dictionary one
if i == lastAVPIdx-1 && !appnd { // last AVP needs to be appended in group
avps, _ := m.FindAVPsWithPath(path[:lastAVPIdx], dict.UndefinedVendorID)
if len(avps) != 0 { // Group AVP already in the message
prevGrpData := avps[len(avps)-1].Data.(*diam.GroupedAVP) // Take the last avp found to append there
prevGrpData.AVP = append(prevGrpData.AVP, msgAVP)
m.Header.MessageLength += uint32(msgAVP.Len())
return nil
}
}
msgAVP = newMsgAVP
}
if !appnd { // Not group AVP, replace the previous set one with this one
avps, _ := m.FindAVPsWithPath(path, dict.UndefinedVendorID)
if len(avps) != 0 { // Group AVP already in the message
m.Header.MessageLength -= uint32(avps[len(avps)-1].Len()) // decrease message length since we overwrite
*avps[len(avps)-1] = *msgAVP
m.Header.MessageLength += uint32(msgAVP.Len())
return nil
}
}
m.AVP = append(m.AVP, msgAVP)
m.Header.MessageLength += uint32(msgAVP.Len())
return nil
}
// debitInterval is the configured debitInterval, in sync with the diameter client one
func NewCCRFromDiameterMessage(m *diam.Message, debitInterval time.Duration) (*CCR, error) {
var ccr CCR
if err := m.Unmarshal(&ccr); err != nil {
return nil, err
}
ccr.diamMessage = m
ccr.debitInterval = debitInterval
return &ccr, nil
}
// CallControl Request
// FixMe: strip it down to mandatory bare structure format by RFC 4006
type CCR struct {
SessionId string `avp:"Session-Id"`
OriginHost string `avp:"Origin-Host"`
OriginRealm string `avp:"Origin-Realm"`
DestinationHost string `avp:"Destination-Host"`
DestinationRealm string `avp:"Destination-Realm"`
AuthApplicationId int `avp:"Auth-Application-Id"`
ServiceContextId string `avp:"Service-Context-Id"`
CCRequestType int `avp:"CC-Request-Type"`
CCRequestNumber int `avp:"CC-Request-Number"`
EventTimestamp time.Time `avp:"Event-Timestamp"`
SubscriptionId []struct {
SubscriptionIdType int `avp:"Subscription-Id-Type"`
SubscriptionIdData string `avp:"Subscription-Id-Data"`
} `avp:"Subscription-Id"`
ServiceIdentifier int `avp:"Service-Identifier"`
RequestedServiceUnit struct {
CCTime int `avp:"CC-Time"`
} `avp:"Requested-Service-Unit"`
UsedServiceUnit struct {
CCTime int `avp:"CC-Time"`
} `avp:"Used-Service-Unit"`
ServiceInformation struct {
INInformation struct {
CallingPartyAddress string `avp:"Calling-Party-Address"`
CalledPartyAddress string `avp:"Called-Party-Address"`
RealCalledNumber string `avp:"Real-Called-Number"`
ChargeFlowType int `avp:"Charge-Flow-Type"`
CallingVlrNumber string `avp:"Calling-Vlr-Number"`
CallingCellIDOrSAI string `avp:"Calling-CellID-Or-SAI"`
BearerCapability string `avp:"Bearer-Capability"`
CallReferenceNumber string `avp:"Call-Reference-Number"`
MSCAddress string `avp:"MSC-Address"`
TimeZone int `avp:"Time-Zone"`
CalledPartyNP string `avp:"Called-Party-NP"`
SSPTime string `avp:"SSP-Time"`
} `avp:"IN-Information"`
} `avp:"Service-Information"`
diamMessage *diam.Message // Used to parse fields with CGR templates
debitInterval time.Duration // Configured debit interval
}
// AsBareDiameterMessage converts CCR into a bare DiameterMessage
// Compatible with the required fields of CCA
func (self *CCR) AsBareDiameterMessage() *diam.Message {
m := diam.NewRequest(diam.CreditControl, 4, nil)
m.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String(self.SessionId))
m.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity(self.OriginHost))
m.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity(self.OriginRealm))
m.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(self.AuthApplicationId))
m.NewAVP(avp.CCRequestType, avp.Mbit, 0, datatype.Enumerated(self.CCRequestType))
m.NewAVP(avp.CCRequestNumber, avp.Mbit, 0, datatype.Unsigned32(self.CCRequestNumber))
return m
}
// Used when sending from client to agent
func (self *CCR) AsDiameterMessage() (*diam.Message, error) {
m := self.AsBareDiameterMessage()
if _, err := m.NewAVP("Destination-Host", avp.Mbit, 0, datatype.DiameterIdentity(self.DestinationHost)); err != nil {
return nil, err
}
if _, err := m.NewAVP("Destination-Realm", avp.Mbit, 0, datatype.DiameterIdentity(self.DestinationRealm)); err != nil {
return nil, err
}
if _, err := m.NewAVP("Service-Context-Id", avp.Mbit, 0, datatype.UTF8String(self.ServiceContextId)); err != nil {
return nil, err
}
if _, err := m.NewAVP("Event-Timestamp", avp.Mbit, 0, datatype.Time(self.EventTimestamp)); err != nil {
return nil, err
}
for _, subscriptionId := range self.SubscriptionId {
if _, err := m.NewAVP("Subscription-Id", avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(450, avp.Mbit, 0, datatype.Enumerated(subscriptionId.SubscriptionIdType)), // Subscription-Id-Type
diam.NewAVP(444, avp.Mbit, 0, datatype.UTF8String(subscriptionId.SubscriptionIdData)), // Subscription-Id-Data
}}); err != nil {
return nil, err
}
}
if _, err := m.NewAVP("Service-Identifier", avp.Mbit, 0, datatype.Unsigned32(self.ServiceIdentifier)); err != nil {
return nil, err
}
if _, err := m.NewAVP("Requested-Service-Unit", avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(420, avp.Mbit, 0, datatype.Unsigned32(self.RequestedServiceUnit.CCTime))}}); err != nil { // CC-Time
return nil, err
}
if _, err := m.NewAVP("Used-Service-Unit", avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(420, avp.Mbit, 0, datatype.Unsigned32(self.UsedServiceUnit.CCTime))}}); err != nil { // CC-Time
return nil, err
}
if _, err := m.NewAVP(873, avp.Mbit, 10415, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(20300, avp.Mbit, 2011, &diam.GroupedAVP{ // IN-Information
AVP: []*diam.AVP{
diam.NewAVP(831, avp.Mbit, 10415, datatype.UTF8String(self.ServiceInformation.INInformation.CallingPartyAddress)), // Calling-Party-Address
diam.NewAVP(832, avp.Mbit, 10415, datatype.UTF8String(self.ServiceInformation.INInformation.CalledPartyAddress)), // Called-Party-Address
diam.NewAVP(20327, avp.Mbit, 2011, datatype.UTF8String(self.ServiceInformation.INInformation.RealCalledNumber)), // Real-Called-Number
diam.NewAVP(20339, avp.Mbit, 2011, datatype.Unsigned32(self.ServiceInformation.INInformation.ChargeFlowType)), // Charge-Flow-Type
diam.NewAVP(20302, avp.Mbit, 2011, datatype.UTF8String(self.ServiceInformation.INInformation.CallingVlrNumber)), // Calling-Vlr-Number
diam.NewAVP(20303, avp.Mbit, 2011, datatype.UTF8String(self.ServiceInformation.INInformation.CallingCellIDOrSAI)), // Calling-CellID-Or-SAI
diam.NewAVP(20313, avp.Mbit, 2011, datatype.UTF8String(self.ServiceInformation.INInformation.BearerCapability)), // Bearer-Capability
diam.NewAVP(20321, avp.Mbit, 2011, datatype.UTF8String(self.ServiceInformation.INInformation.CallReferenceNumber)), // Call-Reference-Number
diam.NewAVP(20322, avp.Mbit, 2011, datatype.UTF8String(self.ServiceInformation.INInformation.MSCAddress)), // MSC-Address
diam.NewAVP(20324, avp.Mbit, 2011, datatype.Unsigned32(self.ServiceInformation.INInformation.TimeZone)), // Time-Zone
diam.NewAVP(20385, avp.Mbit, 2011, datatype.UTF8String(self.ServiceInformation.INInformation.CalledPartyNP)), // Called-Party-NP
diam.NewAVP(20386, avp.Mbit, 2011, datatype.UTF8String(self.ServiceInformation.INInformation.SSPTime)), // SSP-Time
},
}),
}}); err != nil {
return nil, err
}
return m, nil
}
// Extracts data out of CCR into a SMGenericEvent based on the configured template
func (self *CCR) AsMapIface(cfgFlds []*config.CfgCdrField) (map[string]interface{}, error) {
outMap := make(map[string]string) // work with it so we can append values to keys
outMap[utils.EVENT_NAME] = DIAMETER_CCR
for _, cfgFld := range cfgFlds {
fmtOut, err := fieldOutVal(self.diamMessage, cfgFld, self.debitInterval, nil)
if err != nil {
if err == ErrFilterNotPassing {
continue // Do nothing in case of Filter not passing
}
return nil, err
}
if _, hasKey := outMap[cfgFld.FieldId]; hasKey && cfgFld.Append {
outMap[cfgFld.FieldId] += fmtOut
} else {
outMap[cfgFld.FieldId] = fmtOut
}
if cfgFld.BreakOnSuccess {
break
}
}
return utils.ConvertMapValStrIf(outMap), nil
}
func NewBareCCAFromCCR(ccr *CCR, originHost, originRealm string) *CCA {
cca := &CCA{SessionId: ccr.SessionId, AuthApplicationId: ccr.AuthApplicationId, CCRequestType: ccr.CCRequestType, CCRequestNumber: ccr.CCRequestNumber,
OriginHost: originHost, OriginRealm: originRealm,
diamMessage: diam.NewMessage(ccr.diamMessage.Header.CommandCode, ccr.diamMessage.Header.CommandFlags&^diam.RequestFlag, ccr.diamMessage.Header.ApplicationID,
ccr.diamMessage.Header.HopByHopID, ccr.diamMessage.Header.EndToEndID, ccr.diamMessage.Dictionary()), ccrMessage: ccr.diamMessage, debitInterval: ccr.debitInterval,
}
cca.diamMessage = cca.AsBareDiameterMessage() // Add the required fields to the diameterMessage
return cca
}
// Call Control Answer, bare structure so we can dynamically manage adding it's fields
type CCA struct {
SessionId string `avp:"Session-Id"`
OriginHost string `avp:"Origin-Host"`
OriginRealm string `avp:"Origin-Realm"`
AuthApplicationId int `avp:"Auth-Application-Id"`
CCRequestType int `avp:"CC-Request-Type"`
CCRequestNumber int `avp:"CC-Request-Number"`
ResultCode int `avp:"Result-Code"`
GrantedServiceUnit struct {
CCTime int `avp:"CC-Time"`
} `avp:"Granted-Service-Unit"`
ccrMessage *diam.Message
diamMessage *diam.Message
debitInterval time.Duration
timezone string
}
// AsBareDiameterMessage converts CCA into a bare DiameterMessage
func (self *CCA) AsBareDiameterMessage() *diam.Message {
var m diam.Message
utils.Clone(self.diamMessage, &m)
m.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String(self.SessionId))
m.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity(self.OriginHost))
m.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity(self.OriginRealm))
m.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(self.AuthApplicationId))
m.NewAVP(avp.CCRequestType, avp.Mbit, 0, datatype.Enumerated(self.CCRequestType))
m.NewAVP(avp.CCRequestNumber, avp.Mbit, 0, datatype.Enumerated(self.CCRequestNumber))
m.NewAVP(avp.ResultCode, avp.Mbit, 0, datatype.Unsigned32(self.ResultCode))
return &m
}
// AsDiameterMessage returns the diameter.Message which can be later written on network
func (self *CCA) AsDiameterMessage() *diam.Message {
return self.diamMessage
}
// SetProcessorAVPs will add AVPs to self.diameterMessage based on template defined in processor.CCAFields
func (self *CCA) SetProcessorAVPs(reqProcessor *config.DARequestProcessor, processorVars processorVars) error {
for _, cfgFld := range reqProcessor.CCAFields {
fmtOut, err := fieldOutVal(self.ccrMessage, cfgFld, nil, processorVars)
if err == ErrFilterNotPassing { // Field not in or filter not passing, try match in answer
fmtOut, err = fieldOutVal(self.diamMessage, cfgFld, nil, processorVars)
}
if err != nil {
if err == ErrFilterNotPassing {
continue
}
return err
}
if err := messageSetAVPsWithPath(self.diamMessage,
splitIntoInterface(cfgFld.FieldId, utils.HIERARCHY_SEP),
fmtOut, cfgFld.Append, self.timezone); err != nil {
return err
}
if cfgFld.BreakOnSuccess { // don't look for another field
break
}
}
return nil
}

View File

@@ -17,548 +17,3 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
import (
"bytes"
"encoding/binary"
"fmt"
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/fiorix/go-diameter/diam"
"github.com/fiorix/go-diameter/diam/avp"
"github.com/fiorix/go-diameter/diam/datatype"
"github.com/fiorix/go-diameter/diam/dict"
)
var err error
func TestDisectUsageForCCR(t *testing.T) {
if reqType, reqNr, reqCCTime, usedCCTime := disectUsageForCCR(time.Duration(0)*time.Second,
time.Duration(300)*time.Second, false); reqType != 1 || reqNr != 0 || reqCCTime != 300 || usedCCTime != 0 {
t.Error(reqType, reqNr, reqCCTime, usedCCTime)
}
if reqType, reqNr, reqCCTime, usedCCTime := disectUsageForCCR(time.Duration(35)*time.Second, time.Duration(300)*time.Second, false); reqType != 2 || reqNr != 0 || reqCCTime != 300 || usedCCTime != 35 {
t.Error(reqType, reqNr, reqCCTime, usedCCTime)
}
if reqType, reqNr, reqCCTime, usedCCTime := disectUsageForCCR(time.Duration(935)*time.Second, time.Duration(300)*time.Second, false); reqType != 2 || reqNr != 3 || reqCCTime != 300 || usedCCTime != 35 {
t.Error(reqType, reqNr, reqCCTime, usedCCTime)
}
if reqType, reqNr, reqCCTime, usedCCTime := disectUsageForCCR(time.Duration(35)*time.Second, time.Duration(300)*time.Second, true); reqType != 3 || reqNr != 1 || reqCCTime != 0 || usedCCTime != 35 {
t.Error(reqType, reqNr, reqCCTime, usedCCTime)
}
if reqType, reqNr, reqCCTime, usedCCTime := disectUsageForCCR(time.Duration(610)*time.Second, time.Duration(300)*time.Second, true); reqType != 3 || reqNr != 3 || reqCCTime != 0 || usedCCTime != 10 {
t.Error(reqType, reqNr, reqCCTime, usedCCTime)
}
if reqType, reqNr, reqCCTime, usedCCTime := disectUsageForCCR(time.Duration(935)*time.Second, time.Duration(300)*time.Second, true); reqType != 3 || reqNr != 4 || reqCCTime != 0 || usedCCTime != 35 {
t.Error(reqType, reqNr, reqCCTime, usedCCTime)
}
}
func TestUsageFromCCR(t *testing.T) {
if usage := usageFromCCR(1, 0, 0, time.Duration(300)*time.Second); usage != time.Duration(300)*time.Second {
t.Error(usage)
}
if usage := usageFromCCR(2, 0, 300, time.Duration(300)*time.Second); usage != time.Duration(300)*time.Second {
t.Error(usage)
}
if usage := usageFromCCR(2, 3, 300, time.Duration(300)*time.Second); usage != time.Duration(300)*time.Second {
t.Error(usage.Seconds())
}
if usage := usageFromCCR(3, 3, 10, time.Duration(300)*time.Second); usage != time.Duration(610)*time.Second {
t.Error(usage)
}
if usage := usageFromCCR(3, 4, 35, time.Duration(300)*time.Second); usage != time.Duration(935)*time.Second {
t.Error(usage)
}
if usage := usageFromCCR(3, 1, 35, time.Duration(300)*time.Second); usage != time.Duration(35)*time.Second {
t.Error(usage)
}
if usage := usageFromCCR(1, 0, 0, time.Duration(360)*time.Second); usage != time.Duration(360)*time.Second {
t.Error(usage)
}
}
func TestAvpValAsString(t *testing.T) {
originHostStr := "unit_test"
a := diam.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity(originHostStr))
if avpValStr := avpValAsString(a); avpValStr != originHostStr {
t.Errorf("Expected: %s, received: %s", originHostStr, avpValStr)
}
}
func TestMetaValueExponent(t *testing.T) {
m := diam.NewRequest(diam.CreditControl, 4, nil)
m.NewAVP("Session-Id", avp.Mbit, 0, datatype.UTF8String("simuhuawei;1449573472;00002"))
m.NewAVP(avp.RequestedServiceUnit, avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(avp.CCMoney, avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(avp.UnitValue, avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(avp.ValueDigits, avp.Mbit, 0, datatype.Integer64(10000)),
diam.NewAVP(avp.Exponent, avp.Mbit, 0, datatype.Integer32(-5)),
},
}),
diam.NewAVP(avp.CurrencyCode, avp.Mbit, 0, datatype.Unsigned32(33)),
},
}),
},
})
if val, err := metaValueExponent(m, nil, utils.ParseRSRFieldsMustCompile(
"Requested-Service-Unit>CC-Money>Unit-Value>Value-Digits;^|;Requested-Service-Unit>CC-Money>Unit-Value>Exponent",
utils.INFIELD_SEP), 10); err != nil {
t.Error(err)
} else if val != "0.1" {
t.Error("Received: ", val)
}
if _, err = metaValueExponent(m, nil,
utils.ParseRSRFieldsMustCompile(
"Requested-Service-Unit>CC-Money>Unit-Value>Value-Digits;Requested-Service-Unit>CC-Money>Unit-Value>Exponent",
utils.INFIELD_SEP), 10); err == nil {
t.Error("Should have received error") // Insufficient number arguments
}
}
func TestMetaSum(t *testing.T) {
m := diam.NewRequest(diam.CreditControl, 4, nil)
m.NewAVP("Session-Id", avp.Mbit, 0, datatype.UTF8String("simuhuawei;1449573472;00002"))
m.NewAVP(avp.RequestedServiceUnit, avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(avp.CCMoney, avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(avp.UnitValue, avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(avp.ValueDigits, avp.Mbit, 0, datatype.Integer64(10000)),
diam.NewAVP(avp.Exponent, avp.Mbit, 0, datatype.Integer32(-5)),
},
}),
diam.NewAVP(avp.CurrencyCode, avp.Mbit, 0, datatype.Unsigned32(33)),
},
}),
},
})
if val, err := metaSum(m, nil,
utils.ParseRSRFieldsMustCompile(
"Requested-Service-Unit>CC-Money>Unit-Value>Value-Digits;^|;Requested-Service-Unit>CC-Money>Unit-Value>Exponent",
utils.INFIELD_SEP), 0, 10); err != nil {
t.Error(err)
} else if val != "9995" {
t.Error("Received: ", val)
}
if _, err = metaSum(m, nil, utils.ParseRSRFieldsMustCompile(
"Requested-Service-Unit>CC-Money>Unit-Value>Value-Digits;Requested-Service-Unit>CC-Money>Unit-Value>Exponent",
utils.INFIELD_SEP), 0, 10); err == nil {
t.Error("Should have received error") // Insufficient number arguments
}
}
func TestFieldOutVal(t *testing.T) {
m := diam.NewRequest(diam.CreditControl, 4, nil)
m.NewAVP("Session-Id", avp.Mbit, 0, datatype.UTF8String("simuhuawei;1449573472;00002"))
m.NewAVP("Subscription-Id", avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(450, avp.Mbit, 0, datatype.Enumerated(0)), // Subscription-Id-Type
diam.NewAVP(444, avp.Mbit, 0, datatype.UTF8String("33708000003")), // Subscription-Id-Data
}})
m.NewAVP("Subscription-Id", avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(450, avp.Mbit, 0, datatype.Enumerated(1)), // Subscription-Id-Type
diam.NewAVP(444, avp.Mbit, 0, datatype.UTF8String("208708000003")), // Subscription-Id-Data
}})
m.NewAVP("Service-Identifier", avp.Mbit, 0, datatype.Unsigned32(0))
m.NewAVP("Requested-Service-Unit", avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(420, avp.Mbit, 0, datatype.Unsigned32(360))}}) // CC-Time
cfgFld := &config.CfgCdrField{Tag: "StaticTest", Type: utils.META_COMPOSED, FieldId: utils.ToR,
Value: utils.ParseRSRFieldsMustCompile("^*voice", utils.INFIELD_SEP), Mandatory: true}
eOut := "*voice"
if fldOut, err := fieldOutVal(m, cfgFld, time.Duration(0), nil); err != nil {
t.Error(err)
} else if fldOut != eOut {
t.Errorf("Expecting:\n%s\nReceived:\n%s", eOut, fldOut)
}
cfgFld = &config.CfgCdrField{Tag: "ComposedTest", Type: utils.META_COMPOSED, FieldId: utils.Destination,
Value: utils.ParseRSRFieldsMustCompile("Requested-Service-Unit>CC-Time", utils.INFIELD_SEP), Mandatory: true}
eOut = "360"
if fldOut, err := fieldOutVal(m, cfgFld, time.Duration(0), nil); err != nil {
t.Error(err)
} else if fldOut != eOut {
t.Errorf("Expecting:\n%s\nReceived:\n%s", eOut, fldOut)
}
// With filter on ProcessorVars
cfgFld = &config.CfgCdrField{Tag: "ComposedTestWithProcessorVarsFilter", Type: utils.META_COMPOSED, FieldId: utils.Destination,
FieldFilter: utils.ParseRSRFieldsMustCompile("CGRError(INSUFFICIENT_CREDIT)", utils.INFIELD_SEP),
Value: utils.ParseRSRFieldsMustCompile("Requested-Service-Unit>CC-Time", utils.INFIELD_SEP), Mandatory: true}
if _, err := fieldOutVal(m, cfgFld, time.Duration(0), nil); err == nil {
t.Error("Should have error")
}
eOut = "360"
if fldOut, err := fieldOutVal(m, cfgFld, time.Duration(0),
processorVars{"CGRError": "INSUFFICIENT_CREDIT"}); err != nil {
t.Error(err)
} else if fldOut != eOut {
t.Errorf("Expecting:\n%s\nReceived:\n%s", eOut, fldOut)
}
// Without filter, we shoud get always the first subscriptionId
cfgFld = &config.CfgCdrField{Tag: "Grouped1", Type: utils.MetaGrouped, FieldId: "Account",
Value: utils.ParseRSRFieldsMustCompile("Subscription-Id>Subscription-Id-Data", utils.INFIELD_SEP), Mandatory: true}
eOut = "33708000003"
if fldOut, err := fieldOutVal(m, cfgFld, time.Duration(0), nil); err != nil {
t.Error(err)
} else if fldOut != eOut {
t.Errorf("Expecting:\n%s\nReceived:\n%s", eOut, fldOut)
}
// Without groupedAVP, we shoud get the first subscriptionId
cfgFld = &config.CfgCdrField{
Tag: "Grouped2",
Type: utils.MetaGrouped, FieldId: "Account",
FieldFilter: utils.ParseRSRFieldsMustCompile(
"Subscription-Id>Subscription-Id-Type(1)", utils.INFIELD_SEP),
Value: utils.ParseRSRFieldsMustCompile(
"Subscription-Id>Subscription-Id-Data", utils.INFIELD_SEP), Mandatory: true}
eOut = "208708000003"
if fldOut, err := fieldOutVal(m, cfgFld, time.Duration(0), nil); err != nil {
t.Error(err)
} else if fldOut != eOut {
t.Errorf("Expecting:\n%s\nReceived:\n%s", eOut, fldOut)
}
cfgFld = &config.CfgCdrField{
Tag: "TestMultipleFiltersEmptyReply",
Type: utils.META_COMPOSED, FieldId: "Account",
FieldFilter: utils.ParseRSRFieldsMustCompile(
"*cgrReply>Error(^$);*cgrReply>MaxUsage(!300);*cgrReply>MaxUsage(!0)",
utils.INFIELD_SEP),
Value: utils.ParseRSRFieldsMustCompile(
"Subscription-Id>Subscription-Id-Data", utils.INFIELD_SEP),
Mandatory: true}
procVars := processorVars{
utils.MetaCGRReply: map[string]interface{}{
utils.Error: "RALS_ERROR:NOT_FOUND",
},
}
if _, err := fieldOutVal(m, cfgFld, time.Duration(0),
procVars); err != ErrFilterNotPassing {
t.Error(err)
}
}
func TestSerializeAVPValueFromString(t *testing.T) {
dictAVP, _ := dict.Default.FindAVP(4, "Session-Id")
eValByte := []byte("simuhuawei;1449573472;00002")
if valByte, err := serializeAVPValueFromString(dictAVP, "simuhuawei;1449573472;00002", "UTC"); err != nil {
t.Error(err)
} else if !bytes.Equal(eValByte, valByte) {
t.Errorf("Expecting: %+v, received: %+v", eValByte, valByte)
}
dictAVP, _ = dict.Default.FindAVP(4, "Result-Code")
eValByte = make([]byte, 4)
binary.BigEndian.PutUint32(eValByte, uint32(5031))
if valByte, err := serializeAVPValueFromString(dictAVP, "5031", "UTC"); err != nil {
t.Error(err)
} else if !bytes.Equal(eValByte, valByte) {
t.Errorf("Expecting: %+v, received: %+v", eValByte, valByte)
}
}
func TestMessageSetAVPsWithPath(t *testing.T) {
eMessage := diam.NewRequest(diam.CreditControl, 4, nil)
eMessage.NewAVP("Session-Id", avp.Mbit, 0,
datatype.UTF8String("simuhuawei;1449573472;00002"))
m := diam.NewMessage(diam.CreditControl, diam.RequestFlag, 4,
eMessage.Header.HopByHopID, eMessage.Header.EndToEndID, nil)
if err := messageSetAVPsWithPath(m,
[]interface{}{"Session-Id", "Unknown"}, "simuhuawei;1449573472;00002",
false, "UTC"); err == nil ||
err.Error() != "Could not find AVP Unknown" {
t.Error(err)
}
if err := messageSetAVPsWithPath(m,
[]interface{}{"Session-Id"}, "simuhuawei;1449573472;00002",
false, "UTC"); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eMessage, m) {
t.Errorf("Expecting: %+v, received: %+v", eMessage, m)
}
// test append
eMessage.NewAVP("Session-Id", avp.Mbit, 0,
datatype.UTF8String("simuhuawei;1449573472;00003"))
if err := messageSetAVPsWithPath(m, []interface{}{"Session-Id"},
"simuhuawei;1449573472;00003", true, "UTC"); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eMessage, m) {
t.Errorf("Expecting: %+v, received: %+v", eMessage, m)
}
// test overwrite
eMessage = diam.NewRequest(diam.CreditControl, 4, nil)
eMessage.NewAVP("Session-Id", avp.Mbit, 0,
datatype.UTF8String("simuhuawei;1449573472;00002"))
m = diam.NewMessage(diam.CreditControl, diam.RequestFlag, 4,
eMessage.Header.HopByHopID, eMessage.Header.EndToEndID, nil)
if err := messageSetAVPsWithPath(m,
[]interface{}{"Session-Id"}, "simuhuawei;1449573472;00001",
false, "UTC"); err != nil {
t.Error(err)
}
if err := messageSetAVPsWithPath(m,
[]interface{}{"Session-Id"}, "simuhuawei;1449573472;00002",
false, "UTC"); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eMessage, m) {
t.Errorf("Expecting: %+v, received: %+v", eMessage, m)
}
eMessage = diam.NewRequest(diam.CreditControl, 4, nil)
eMessage.NewAVP("Subscription-Id", avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(444, avp.Mbit, 0, datatype.UTF8String("33708000003")), // Subscription-Id-Data
}})
m = diam.NewMessage(diam.CreditControl, diam.RequestFlag, 4,
eMessage.Header.HopByHopID, eMessage.Header.EndToEndID, nil)
if err := messageSetAVPsWithPath(m,
[]interface{}{"Subscription-Id", "Subscription-Id-Data"},
"33708000003", false, "UTC"); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eMessage, m) {
t.Errorf("Expecting: %+v, received: %+v", eMessage, m)
}
// test append
eMessage.NewAVP("Subscription-Id", avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(450, avp.Mbit, 0, datatype.Enumerated(0)), // Subscription-Id-Data
}})
if err := messageSetAVPsWithPath(m,
[]interface{}{"Subscription-Id", "Subscription-Id-Type"},
"0", true, "UTC"); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eMessage, m) {
t.Errorf("Expecting: %+v, received: %+v", eMessage, m)
}
// test group append
eMessage = diam.NewRequest(diam.CreditControl, 4, nil)
eMessage.NewAVP("Subscription-Id", avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(450, avp.Mbit, 0, datatype.Enumerated(0)), // Subscription-Id-Data
diam.NewAVP(444, avp.Mbit, 0, datatype.UTF8String("33708000003")), // Subscription-Id-Data
}})
eMsgSrl, _ := eMessage.Serialize()
m = diam.NewMessage(diam.CreditControl, diam.RequestFlag, 4, eMessage.Header.HopByHopID, eMessage.Header.EndToEndID, nil)
if err := messageSetAVPsWithPath(m, []interface{}{"Subscription-Id", "Subscription-Id-Type"}, "0", false, "UTC"); err != nil {
t.Error(err)
}
if err := messageSetAVPsWithPath(m, []interface{}{"Subscription-Id", "Subscription-Id-Data"}, "33708000003", false, "UTC"); err != nil {
t.Error(err)
} else {
mSrl, _ := m.Serialize()
if !bytes.Equal(eMsgSrl, mSrl) {
t.Errorf("Expecting: %+v, received: %+v", eMessage, m)
}
}
eMessage = diam.NewRequest(diam.CreditControl, 4, nil)
eMessage.NewAVP("Granted-Service-Unit", avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(420, avp.Mbit, 0, datatype.Unsigned32(300)), // Subscription-Id-Data
}})
m = diam.NewMessage(diam.CreditControl, diam.RequestFlag, 4, eMessage.Header.HopByHopID, eMessage.Header.EndToEndID, nil)
if err := messageSetAVPsWithPath(m, []interface{}{"Granted-Service-Unit", "CC-Time"}, "300", false, "UTC"); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eMessage, m) {
t.Errorf("Expecting: %+v, received: %+v", eMessage, m)
}
// Multiple append
eMessage = diam.NewRequest(diam.CreditControl, 4, nil)
eMessage.NewAVP("Multiple-Services-Credit-Control", avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(431, avp.Mbit, 0, &diam.GroupedAVP{ // Granted-Service-Unit
AVP: []*diam.AVP{
diam.NewAVP(420, avp.Mbit, 0, datatype.Unsigned32(3600)),
diam.NewAVP(421, avp.Mbit, 0, datatype.Unsigned64(153600)), // "CC-Total-Octets"
},
}),
diam.NewAVP(432, avp.Mbit, 0, datatype.Unsigned32(10)),
},
})
eMessage.NewAVP("Multiple-Services-Credit-Control", avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(431, avp.Mbit, 0, &diam.GroupedAVP{ // Granted-Service-Unit
AVP: []*diam.AVP{
diam.NewAVP(420, avp.Mbit, 0, datatype.Unsigned32(2600)),
diam.NewAVP(421, avp.Mbit, 0, datatype.Unsigned64(143600)), // "CC-Total-Octets"
},
}),
diam.NewAVP(432, avp.Mbit, 0, datatype.Unsigned32(11)), // Rating-Group
},
})
m = diam.NewMessage(diam.CreditControl, diam.RequestFlag, 4, eMessage.Header.HopByHopID, eMessage.Header.EndToEndID, nil)
if err := messageSetAVPsWithPath(m, []interface{}{"Multiple-Services-Credit-Control", "Granted-Service-Unit", "CC-Time"}, "3600", false, "UTC"); err != nil {
t.Error(err)
}
if err := messageSetAVPsWithPath(m, []interface{}{"Multiple-Services-Credit-Control", "Granted-Service-Unit", "CC-Total-Octets"}, "153600", false, "UTC"); err != nil {
t.Error(err)
}
if err := messageSetAVPsWithPath(m, []interface{}{"Multiple-Services-Credit-Control", "Rating-Group"}, "10", false, "UTC"); err != nil {
t.Error(err)
}
if err := messageSetAVPsWithPath(m, []interface{}{"Multiple-Services-Credit-Control", "Granted-Service-Unit", "CC-Time"}, "2600", true, "UTC"); err != nil {
t.Error(err)
}
if err := messageSetAVPsWithPath(m, []interface{}{"Multiple-Services-Credit-Control", "Granted-Service-Unit", "CC-Total-Octets"}, "143600", false, "UTC"); err != nil {
t.Error(err)
}
if err := messageSetAVPsWithPath(m, []interface{}{"Multiple-Services-Credit-Control", "Rating-Group"}, "11", false, "UTC"); err != nil {
t.Error(err)
}
if fmt.Sprintf("%q", eMessage) != fmt.Sprintf("%q", m) { // test with fmt since reflect.DeepEqual does not perform properly here
t.Errorf("Expecting: %+v, received: %+v", eMessage, m)
}
}
func TestCCASetProcessorAVPs(t *testing.T) {
ccr := &CCR{ // Bare information, just the one needed for answer
SessionId: "routinga;1442095190;1476802709",
AuthApplicationId: 4,
CCRequestType: 1,
CCRequestNumber: 0,
}
ccr.diamMessage = ccr.AsBareDiameterMessage()
ccr.diamMessage.NewAVP("Subscription-Id", avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(450, avp.Mbit, 0, datatype.Enumerated(0)), // Subscription-Id-Type
diam.NewAVP(444, avp.Mbit, 0, datatype.UTF8String("33708000003")), // Subscription-Id-Data
}})
ccr.debitInterval = time.Duration(300) * time.Second
cca := NewBareCCAFromCCR(ccr, "CGR-DA", "cgrates.org")
reqProcessor := &config.DARequestProcessor{Id: "UNIT_TEST", // Set template for tests
CCAFields: []*config.CfgCdrField{
&config.CfgCdrField{Tag: "Subscription-Id/Subscription-Id-Type", Type: utils.META_COMPOSED,
FieldId: "Subscription-Id>Subscription-Id-Type",
Value: utils.ParseRSRFieldsMustCompile("Subscription-Id>Subscription-Id-Type", utils.INFIELD_SEP), Mandatory: true},
&config.CfgCdrField{Tag: "Subscription-Id/Subscription-Id-Data", Type: utils.META_COMPOSED,
FieldId: "Subscription-Id>Subscription-Id-Data",
Value: utils.ParseRSRFieldsMustCompile("Subscription-Id>Subscription-Id-Data", utils.INFIELD_SEP), Mandatory: true},
},
}
eMessage := cca.AsDiameterMessage()
eMessage.NewAVP("Subscription-Id", avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(450, avp.Mbit, 0, datatype.Enumerated(0)), // Subscription-Id-Type
diam.NewAVP(444, avp.Mbit, 0, datatype.UTF8String("33708000003")), // Subscription-Id-Data
}})
if err := cca.SetProcessorAVPs(reqProcessor, processorVars{}); err != nil {
t.Error(err)
} else if ccaMsg := cca.AsDiameterMessage(); !reflect.DeepEqual(eMessage, ccaMsg) {
t.Errorf("Expecting: %+v, received: %+v", eMessage, ccaMsg)
}
}
func TestCCRAsSMGenericEvent(t *testing.T) {
ccr := &CCR{ // Bare information, just the one needed for answer
SessionId: "ccrasgen1",
AuthApplicationId: 4,
CCRequestType: 3,
}
ccr.diamMessage = ccr.AsBareDiameterMessage()
ccr.diamMessage.NewAVP("Multiple-Services-Credit-Control", avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(446, avp.Mbit, 0, &diam.GroupedAVP{ // Used-Service-Unit
AVP: []*diam.AVP{
diam.NewAVP(420, avp.Mbit, 0, datatype.Unsigned32(17)), // CC-Time
diam.NewAVP(412, avp.Mbit, 0, datatype.Unsigned64(1341)), // CC-Input-Octets
diam.NewAVP(414, avp.Mbit, 0, datatype.Unsigned64(3079)), // CC-Output-Octets
},
}),
diam.NewAVP(432, avp.Mbit, 0, datatype.Unsigned32(99)),
},
})
ccr.diamMessage.NewAVP("Multiple-Services-Credit-Control", avp.Mbit, 0, &diam.GroupedAVP{
AVP: []*diam.AVP{
diam.NewAVP(446, avp.Mbit, 0, &diam.GroupedAVP{ // Used-Service-Unit
AVP: []*diam.AVP{
diam.NewAVP(452, avp.Mbit, 0, datatype.Enumerated(0)), // Tariff-Change-Usage
diam.NewAVP(420, avp.Mbit, 0, datatype.Unsigned32(20)), // CC-Time
diam.NewAVP(412, avp.Mbit, 0, datatype.Unsigned64(8046)), // CC-Input-Octets
diam.NewAVP(414, avp.Mbit, 0, datatype.Unsigned64(46193)), // CC-Output-Octets
},
}),
diam.NewAVP(432, avp.Mbit, 0, datatype.Unsigned32(1)),
},
})
ccr.diamMessage.NewAVP("FramedIPAddress", avp.Mbit, 0, datatype.OctetString("0AE40041"))
cfgFlds := make([]*config.CfgCdrField, 0)
eSMGEv := map[string]interface{}{"EventName": "DIAMETER_CCR"}
if rSMGEv, err := ccr.AsMapIface(cfgFlds); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eSMGEv, rSMGEv) {
t.Errorf("Expecting: %+v, received: %+v", eSMGEv, rSMGEv)
}
cfgFlds = []*config.CfgCdrField{
&config.CfgCdrField{
Tag: "LastUsed",
FieldFilter: utils.ParseRSRFieldsMustCompile("~Multiple-Services-Credit-Control>Used-Service-Unit>CC-Input-Octets:s/^(.*)$/test/(test);Multiple-Services-Credit-Control>Rating-Group(1)", utils.INFIELD_SEP),
FieldId: "LastUsed",
Type: "*handler",
HandlerId: "*sum",
Value: utils.ParseRSRFieldsMustCompile("Multiple-Services-Credit-Control>Used-Service-Unit>CC-Input-Octets;^|;Multiple-Services-Credit-Control>Used-Service-Unit>CC-Output-Octets", utils.INFIELD_SEP),
Mandatory: true,
},
}
eSMGEv = map[string]interface{}{"EventName": "DIAMETER_CCR", "LastUsed": "54239"}
if rSMGEv, err := ccr.AsMapIface(cfgFlds); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eSMGEv, rSMGEv) {
t.Errorf("Expecting: %+v, received: %+v", eSMGEv, rSMGEv)
}
cfgFlds = []*config.CfgCdrField{
&config.CfgCdrField{
Tag: "LastUsed",
FieldFilter: utils.ParseRSRFieldsMustCompile("~Multiple-Services-Credit-Control>Used-Service-Unit>CC-Input-Octets:s/^(.*)$/test/(test);Multiple-Services-Credit-Control>Rating-Group(99)", utils.INFIELD_SEP),
FieldId: "LastUsed",
Type: "*handler",
HandlerId: "*sum",
Value: utils.ParseRSRFieldsMustCompile("Multiple-Services-Credit-Control>Used-Service-Unit>CC-Input-Octets;^|;Multiple-Services-Credit-Control>Used-Service-Unit>CC-Output-Octets", utils.INFIELD_SEP),
Mandatory: true,
},
}
eSMGEv = map[string]interface{}{"EventName": "DIAMETER_CCR", "LastUsed": "4420"}
if rSMGEv, err := ccr.AsMapIface(cfgFlds); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eSMGEv, rSMGEv) {
t.Errorf("Expecting: %+v, received: %+v", eSMGEv, rSMGEv)
}
}
func TestPassesFieldFilter(t *testing.T) {
m := diam.NewRequest(diam.CreditControl, 4, nil) // Multiple-Services-Credit-Control>Rating-Group
if pass, _ := passesFieldFilter(m,
utils.ParseRSRFieldsMustCompile("Multiple-Services-Credit-Control>Rating-Group(^$)",
utils.INFIELD_SEP)[0], nil); !pass {
t.Error("Does not pass")
}
procVars := processorVars{
utils.MetaCGRReply: map[string]interface{}{
utils.CapAttributes: map[string]interface{}{
"RadReply": "AccessAccept",
utils.Account: "1001",
},
utils.CapMaxUsage: time.Duration(0),
utils.Error: "",
},
}
if pass, _ := passesFieldFilter(nil,
utils.ParseRSRFieldsMustCompile("*cgrReply>MaxUsage(^0s$)", utils.INFIELD_SEP)[0],
procVars); !pass {
t.Error("not passing valid filter")
}
if pass, _ := passesFieldFilter(nil,
utils.ParseRSRFieldsMustCompile("*cgrReply>MaxUsage{*duration_seconds}(^0$)", utils.INFIELD_SEP)[0],
procVars); !pass {
t.Error("not passing valid filter")
}
if pass, _ := passesFieldFilter(nil,
utils.ParseRSRFieldsMustCompile("*cgrReply>Error(^$)", utils.INFIELD_SEP)[0],
procVars); !pass {
t.Error("not passing valid filter")
}
}