mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-14 20:59:53 +05:00
Diameter configuration updates, removed connection to ThresholdS, SessionSv1.ProcessEvent now publishing event to ThresholdS and StatS
This commit is contained in:
@@ -32,19 +32,16 @@ import (
|
||||
)
|
||||
|
||||
func NewDiameterAgent(cgrCfg *config.CGRConfig,
|
||||
sessionS, thdS rpcclient.RpcClientConnection) (*DiameterAgent, error) {
|
||||
sessionS rpcclient.RpcClientConnection) (*DiameterAgent, error) {
|
||||
if sessionS != nil && reflect.ValueOf(sessionS).IsNil() {
|
||||
sessionS = nil
|
||||
}
|
||||
if thdS != nil && reflect.ValueOf(thdS).IsNil() {
|
||||
thdS = nil
|
||||
}
|
||||
da := &DiameterAgent{
|
||||
cgrCfg: cgrCfg, sessionS: sessionS,
|
||||
thdS: thdS, connMux: new(sync.Mutex)}
|
||||
dictsDir := cgrCfg.DiameterAgentCfg().DictionariesDir
|
||||
if len(dictsDir) != 0 {
|
||||
if err := loadDictionaries(dictsDir, utils.DiameterAgent); err != nil {
|
||||
connMux: new(sync.Mutex)}
|
||||
dictsPath := cgrCfg.DiameterAgentCfg().DictionariesPath
|
||||
if len(dictsPath) != 0 {
|
||||
if err := loadDictionaries(dictsPath, utils.DiameterAgent); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@@ -54,7 +51,6 @@ func NewDiameterAgent(cgrCfg *config.CGRConfig,
|
||||
type DiameterAgent struct {
|
||||
cgrCfg *config.CGRConfig
|
||||
sessionS rpcclient.RpcClientConnection // Connection towards CGR-SessionS component
|
||||
thdS rpcclient.RpcClientConnection // Connection towards CGR-ThresholdS component
|
||||
connMux *sync.Mutex // Protect connection for read/write
|
||||
}
|
||||
|
||||
|
||||
@@ -185,7 +185,10 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.HttpAgntProcCfg,
|
||||
evArgs := sessions.NewV1ProcessEventArgs(
|
||||
reqProcessor.Flags.HasKey(utils.MetaResources),
|
||||
reqProcessor.Flags.HasKey(utils.MetaAccounts),
|
||||
reqProcessor.Flags.HasKey(utils.MetaAttributes), *cgrEv)
|
||||
reqProcessor.Flags.HasKey(utils.MetaAttributes),
|
||||
reqProcessor.Flags.HasKey(utils.MetaThresholds),
|
||||
reqProcessor.Flags.HasKey(utils.MetaStats),
|
||||
*cgrEv)
|
||||
var eventRply sessions.V1ProcessEventReply
|
||||
err = ha.sessionS.Call(utils.SessionSv1ProcessEvent,
|
||||
evArgs, &eventRply)
|
||||
|
||||
@@ -227,7 +227,10 @@ func (ra *RadiusAgent) processRequest(reqProcessor *config.RARequestProcessor,
|
||||
evArgs := sessions.NewV1ProcessEventArgs(
|
||||
reqProcessor.Flags.HasKey(utils.MetaResources),
|
||||
reqProcessor.Flags.HasKey(utils.MetaAccounts),
|
||||
reqProcessor.Flags.HasKey(utils.MetaAttributes), *cgrEv)
|
||||
reqProcessor.Flags.HasKey(utils.MetaAttributes),
|
||||
reqProcessor.Flags.HasKey(utils.MetaThresholds),
|
||||
reqProcessor.Flags.HasKey(utils.MetaStats),
|
||||
*cgrEv)
|
||||
var eventRply sessions.V1ProcessEventReply
|
||||
err = ra.sessionS.Call(utils.SessionSv1ProcessEvent,
|
||||
evArgs, &eventRply)
|
||||
|
||||
@@ -278,10 +278,10 @@ func startAsteriskAgent(internalSMGChan chan rpcclient.RpcClientConnection, exit
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
func startDiameterAgent(internalSMGChan, internalThdSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
|
||||
func startDiameterAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
|
||||
var err error
|
||||
utils.Logger.Info("Starting CGRateS DiameterAgent service")
|
||||
var smgConn, thdSConn *rpcclient.RpcClientPool
|
||||
var smgConn *rpcclient.RpcClientPool
|
||||
if len(cfg.DiameterAgentCfg().SessionSConns) != 0 {
|
||||
smgConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate,
|
||||
cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
|
||||
@@ -293,18 +293,7 @@ func startDiameterAgent(internalSMGChan, internalThdSChan chan rpcclient.RpcClie
|
||||
return
|
||||
}
|
||||
}
|
||||
if len(cfg.DiameterAgentCfg().ThresholdSConns) != 0 {
|
||||
thdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate,
|
||||
cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
|
||||
cfg.DiameterAgentCfg().ThresholdSConns, internalThdSChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
|
||||
utils.DiameterAgent, utils.ThresholdS, err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
da, err := agents.NewDiameterAgent(cfg, smgConn, thdSConn)
|
||||
da, err := agents.NewDiameterAgent(cfg, smgConn)
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> error: %s!", err))
|
||||
exitChan <- true
|
||||
@@ -1300,7 +1289,7 @@ func main() {
|
||||
}
|
||||
|
||||
if cfg.DiameterAgentCfg().Enabled {
|
||||
go startDiameterAgent(internalSMGChan, internalThresholdSChan, exitChan)
|
||||
go startDiameterAgent(internalSMGChan, exitChan)
|
||||
}
|
||||
|
||||
if cfg.RadiusAgentCfg().Enabled {
|
||||
|
||||
@@ -634,13 +634,8 @@ func (self *CGRConfig) checkConfigSanity() error {
|
||||
if self.diameterAgentCfg.Enabled {
|
||||
for _, daSMGConn := range self.diameterAgentCfg.SessionSConns {
|
||||
if daSMGConn.Address == utils.MetaInternal && !self.sessionSCfg.Enabled {
|
||||
return errors.New("SMGeneric not enabled but referenced by DiameterAgent component")
|
||||
}
|
||||
}
|
||||
for _, conn := range self.diameterAgentCfg.ThresholdSConns {
|
||||
if conn.Address == utils.MetaInternal && !self.ThresholdSCfg().Enabled {
|
||||
return fmt.Errorf("%s not enabled but requested by %s component.",
|
||||
utils.ThresholdS, utils.DiameterAgent)
|
||||
return fmt.Errorf("%s not enabled but referenced by %s component",
|
||||
utils.SessionS, utils.DiameterAgent)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -365,15 +365,10 @@ const CGRATES_CFG_JSON = `
|
||||
"diameter_agent": {
|
||||
"enabled": false, // enables the diameter agent: <true|false>
|
||||
"listen": "127.0.0.1:3868", // address where to listen for diameter requests <x.y.z.y:1234>
|
||||
"dictionaries_dir": "/usr/share/cgrates/diameter/dict/", // path towards directory holding additional dictionaries to load
|
||||
"dictionaries_path": "/usr/share/cgrates/diameter/dict/", // path towards directory holding additional dictionaries to load
|
||||
"sessions_conns": [
|
||||
{"address": "*internal"} // connection towards SessionService
|
||||
],
|
||||
"thresholds_conns": [], // address where to reach the thresholds service, empty to disable thresholds functionality: <""|*internal|x.y.z.y:1234>
|
||||
"create_cdr": true, // create CDR out of CCR terminate and send it to SessionS
|
||||
"cdr_requires_session": true, // only create CDR if there is an active session at terminate
|
||||
"debit_interval": "5m", // interval for CCR updates
|
||||
"timezone": "", // timezone for timestamps where not specified, empty for general defaults <""|UTC|Local|$IANA_TZ_DB>
|
||||
"origin_host": "CGR-DA", // diameter Origin-Host AVP used in replies
|
||||
"origin_realm": "cgrates.org", // diameter Origin-Realm AVP used in replies
|
||||
"vendor_id": 0, // diameter Vendor-Id AVP used in replies
|
||||
|
||||
@@ -595,29 +595,24 @@ func TestAsteriskAgentJsonCfg(t *testing.T) {
|
||||
|
||||
func TestDiameterAgentJsonCfg(t *testing.T) {
|
||||
eCfg := &DiameterAgentJsonCfg{
|
||||
Enabled: utils.BoolPointer(false),
|
||||
Listen: utils.StringPointer("127.0.0.1:3868"),
|
||||
Dictionaries_dir: utils.StringPointer("/usr/share/cgrates/diameter/dict/"),
|
||||
Enabled: utils.BoolPointer(false),
|
||||
Listen: utils.StringPointer("127.0.0.1:3868"),
|
||||
Dictionaries_path: utils.StringPointer("/usr/share/cgrates/diameter/dict/"),
|
||||
Sessions_conns: &[]*HaPoolJsonCfg{
|
||||
&HaPoolJsonCfg{
|
||||
Address: utils.StringPointer(utils.MetaInternal),
|
||||
}},
|
||||
Thresholds_conns: &[]*HaPoolJsonCfg{},
|
||||
Create_cdr: utils.BoolPointer(true),
|
||||
Cdr_requires_session: utils.BoolPointer(true),
|
||||
Debit_interval: utils.StringPointer("5m"),
|
||||
Timezone: utils.StringPointer(""),
|
||||
Origin_host: utils.StringPointer("CGR-DA"),
|
||||
Origin_realm: utils.StringPointer("cgrates.org"),
|
||||
Vendor_id: utils.IntPointer(0),
|
||||
Product_name: utils.StringPointer("CGRateS"),
|
||||
Request_processors: &[]*DARequestProcessorJsnCfg{},
|
||||
Origin_host: utils.StringPointer("CGR-DA"),
|
||||
Origin_realm: utils.StringPointer("cgrates.org"),
|
||||
Vendor_id: utils.IntPointer(0),
|
||||
Product_name: utils.StringPointer("CGRateS"),
|
||||
Request_processors: &[]*DARequestProcessorJsnCfg{},
|
||||
}
|
||||
if cfg, err := dfCgrJsonCfg.DiameterAgentJsonCfg(); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eCfg, cfg) {
|
||||
rcv := *cfg.Request_processors
|
||||
t.Errorf("Received: %+v", rcv[0].CCA_fields)
|
||||
t.Errorf("Received: %+v", rcv)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -637,9 +632,8 @@ func TestRadiusAgentJsonCfg(t *testing.T) {
|
||||
&HaPoolJsonCfg{
|
||||
Address: utils.StringPointer(utils.MetaInternal),
|
||||
}},
|
||||
Cdr_requires_session: utils.BoolPointer(false),
|
||||
Timezone: utils.StringPointer(""),
|
||||
Request_processors: &[]*RAReqProcessorJsnCfg{},
|
||||
Timezone: utils.StringPointer(""),
|
||||
Request_processors: &[]*RAReqProcessorJsnCfg{},
|
||||
}
|
||||
if cfg, err := dfCgrJsonCfg.RadiusAgentJsonCfg(); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -937,15 +937,11 @@ func TestCgrCfgJSONDefaultSupplierSCfg(t *testing.T) {
|
||||
|
||||
func TestCgrCfgJSONDefaultsDiameterAgentCfg(t *testing.T) {
|
||||
testDA := &DiameterAgentCfg{
|
||||
Enabled: false,
|
||||
Listen: "127.0.0.1:3868",
|
||||
DictionariesDir: "/usr/share/cgrates/diameter/dict/",
|
||||
Enabled: false,
|
||||
Listen: "127.0.0.1:3868",
|
||||
DictionariesPath: "/usr/share/cgrates/diameter/dict/",
|
||||
SessionSConns: []*HaPoolConfig{
|
||||
&HaPoolConfig{Address: "*internal"}},
|
||||
ThresholdSConns: []*HaPoolConfig{},
|
||||
CreateCDR: true,
|
||||
DebitInterval: 5 * time.Minute,
|
||||
Timezone: "",
|
||||
OriginHost: "CGR-DA",
|
||||
OriginRealm: "cgrates.org",
|
||||
VendorId: 0,
|
||||
@@ -959,26 +955,12 @@ func TestCgrCfgJSONDefaultsDiameterAgentCfg(t *testing.T) {
|
||||
if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.Listen, testDA.Listen) {
|
||||
t.Errorf("expecting: %+v, received: %+v", cgrCfg.diameterAgentCfg.Listen, testDA.Listen)
|
||||
}
|
||||
if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.DictionariesDir, testDA.DictionariesDir) {
|
||||
t.Errorf("expecting: %+v, received: %+v", cgrCfg.diameterAgentCfg.DictionariesDir, testDA.DictionariesDir)
|
||||
if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.DictionariesPath, testDA.DictionariesPath) {
|
||||
t.Errorf("expecting: %+v, received: %+v", cgrCfg.diameterAgentCfg.DictionariesPath, testDA.DictionariesPath)
|
||||
}
|
||||
if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.SessionSConns, testDA.SessionSConns) {
|
||||
t.Errorf("expecting: %+v, received: %+v", cgrCfg.diameterAgentCfg.SessionSConns, testDA.SessionSConns)
|
||||
}
|
||||
if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.ThresholdSConns,
|
||||
testDA.ThresholdSConns) {
|
||||
t.Errorf("expecting: %+v, received: %+v",
|
||||
cgrCfg.diameterAgentCfg.ThresholdSConns, testDA.ThresholdSConns)
|
||||
}
|
||||
if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.CreateCDR, testDA.CreateCDR) {
|
||||
t.Errorf("expecting: %+v, received: %+v", cgrCfg.diameterAgentCfg.CreateCDR, testDA.CreateCDR)
|
||||
}
|
||||
if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.DebitInterval, testDA.DebitInterval) {
|
||||
t.Errorf("expecting: %+v, received: %+v", cgrCfg.diameterAgentCfg.DebitInterval, testDA.DebitInterval)
|
||||
}
|
||||
if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.Timezone, testDA.Timezone) {
|
||||
t.Errorf("received: %+v, expecting: %+v", cgrCfg.diameterAgentCfg.Timezone, testDA.Timezone)
|
||||
}
|
||||
if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.OriginHost, testDA.OriginHost) {
|
||||
t.Errorf("received: %+v, expecting: %+v", cgrCfg.diameterAgentCfg.OriginHost, testDA.OriginHost)
|
||||
}
|
||||
@@ -1079,7 +1061,6 @@ func TestRadiusAgentCfg(t *testing.T) {
|
||||
ClientSecrets: map[string]string{utils.META_DEFAULT: "CGRateS.org"},
|
||||
ClientDictionaries: map[string]string{utils.META_DEFAULT: "/usr/share/cgrates/radius/dict/"},
|
||||
SessionSConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}},
|
||||
CDRRequiresSession: false,
|
||||
Timezone: "",
|
||||
RequestProcessors: nil,
|
||||
}
|
||||
|
||||
@@ -19,87 +19,58 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package config
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
type DiameterAgentCfg struct {
|
||||
Enabled bool // enables the diameter agent: <true|false>
|
||||
Listen string // address where to listen for diameter requests <x.y.z.y:1234>
|
||||
DictionariesDir string
|
||||
SessionSConns []*HaPoolConfig // connections towards SMG component
|
||||
ThresholdSConns []*HaPoolConfig // connection towards pubsubs
|
||||
CreateCDR bool
|
||||
CDRRequiresSession bool
|
||||
DebitInterval time.Duration
|
||||
Timezone string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
OriginHost string
|
||||
OriginRealm string
|
||||
VendorId int
|
||||
ProductName string
|
||||
RequestProcessors []*DARequestProcessor
|
||||
Enabled bool // enables the diameter agent: <true|false>
|
||||
Listen string // address where to listen for diameter requests <x.y.z.y:1234>
|
||||
DictionariesPath string
|
||||
SessionSConns []*HaPoolConfig // connections towards SMG component
|
||||
OriginHost string
|
||||
OriginRealm string
|
||||
VendorId int
|
||||
ProductName string
|
||||
RequestProcessors []*DARequestProcessor
|
||||
}
|
||||
|
||||
func (self *DiameterAgentCfg) loadFromJsonCfg(jsnCfg *DiameterAgentJsonCfg) error {
|
||||
func (da *DiameterAgentCfg) loadFromJsonCfg(jsnCfg *DiameterAgentJsonCfg) error {
|
||||
if jsnCfg == nil {
|
||||
return nil
|
||||
}
|
||||
if jsnCfg.Enabled != nil {
|
||||
self.Enabled = *jsnCfg.Enabled
|
||||
da.Enabled = *jsnCfg.Enabled
|
||||
}
|
||||
if jsnCfg.Listen != nil {
|
||||
self.Listen = *jsnCfg.Listen
|
||||
da.Listen = *jsnCfg.Listen
|
||||
}
|
||||
if jsnCfg.Dictionaries_dir != nil {
|
||||
self.DictionariesDir = *jsnCfg.Dictionaries_dir
|
||||
if jsnCfg.Dictionaries_path != nil {
|
||||
da.DictionariesPath = *jsnCfg.Dictionaries_path
|
||||
}
|
||||
if jsnCfg.Sessions_conns != nil {
|
||||
self.SessionSConns = make([]*HaPoolConfig, len(*jsnCfg.Sessions_conns))
|
||||
da.SessionSConns = make([]*HaPoolConfig, len(*jsnCfg.Sessions_conns))
|
||||
for idx, jsnHaCfg := range *jsnCfg.Sessions_conns {
|
||||
self.SessionSConns[idx] = NewDfltHaPoolConfig()
|
||||
self.SessionSConns[idx].loadFromJsonCfg(jsnHaCfg)
|
||||
da.SessionSConns[idx] = NewDfltHaPoolConfig()
|
||||
da.SessionSConns[idx].loadFromJsonCfg(jsnHaCfg)
|
||||
}
|
||||
}
|
||||
if jsnCfg.Thresholds_conns != nil {
|
||||
self.ThresholdSConns = make([]*HaPoolConfig, len(*jsnCfg.Thresholds_conns))
|
||||
for idx, jsnHaCfg := range *jsnCfg.Thresholds_conns {
|
||||
self.ThresholdSConns[idx] = NewDfltHaPoolConfig()
|
||||
self.ThresholdSConns[idx].loadFromJsonCfg(jsnHaCfg)
|
||||
}
|
||||
}
|
||||
if jsnCfg.Create_cdr != nil {
|
||||
self.CreateCDR = *jsnCfg.Create_cdr
|
||||
}
|
||||
if jsnCfg.Cdr_requires_session != nil {
|
||||
self.CDRRequiresSession = *jsnCfg.Cdr_requires_session
|
||||
}
|
||||
if jsnCfg.Debit_interval != nil {
|
||||
var err error
|
||||
if self.DebitInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Debit_interval); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if jsnCfg.Timezone != nil {
|
||||
self.Timezone = *jsnCfg.Timezone
|
||||
}
|
||||
if jsnCfg.Origin_host != nil {
|
||||
self.OriginHost = *jsnCfg.Origin_host
|
||||
da.OriginHost = *jsnCfg.Origin_host
|
||||
}
|
||||
if jsnCfg.Origin_realm != nil {
|
||||
self.OriginRealm = *jsnCfg.Origin_realm
|
||||
da.OriginRealm = *jsnCfg.Origin_realm
|
||||
}
|
||||
if jsnCfg.Vendor_id != nil {
|
||||
self.VendorId = *jsnCfg.Vendor_id
|
||||
da.VendorId = *jsnCfg.Vendor_id
|
||||
}
|
||||
if jsnCfg.Product_name != nil {
|
||||
self.ProductName = *jsnCfg.Product_name
|
||||
da.ProductName = *jsnCfg.Product_name
|
||||
}
|
||||
if jsnCfg.Request_processors != nil {
|
||||
for _, reqProcJsn := range *jsnCfg.Request_processors {
|
||||
rp := new(DARequestProcessor)
|
||||
var haveID bool
|
||||
for _, rpSet := range self.RequestProcessors {
|
||||
for _, rpSet := range da.RequestProcessors {
|
||||
if reqProcJsn.Id != nil && rpSet.Id == *reqProcJsn.Id {
|
||||
rp = rpSet // Will load data into the one set
|
||||
haveID = true
|
||||
@@ -110,7 +81,7 @@ func (self *DiameterAgentCfg) loadFromJsonCfg(jsnCfg *DiameterAgentJsonCfg) erro
|
||||
return nil
|
||||
}
|
||||
if !haveID {
|
||||
self.RequestProcessors = append(self.RequestProcessors, rp)
|
||||
da.RequestProcessors = append(da.RequestProcessors, rp)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -120,53 +91,45 @@ func (self *DiameterAgentCfg) loadFromJsonCfg(jsnCfg *DiameterAgentJsonCfg) erro
|
||||
// One Diameter request processor configuration
|
||||
type DARequestProcessor struct {
|
||||
Id string
|
||||
DryRun bool
|
||||
ThresholdSEvent bool
|
||||
RequestFilter utils.RSRFields
|
||||
Flags utils.StringMap // Various flags to influence behavior
|
||||
Tenant RSRParsers
|
||||
Filters []string
|
||||
Flags utils.StringMap
|
||||
Timezone string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
ContinueOnSuccess bool
|
||||
AppendCCA bool
|
||||
CCRFields []*CfgCdrField
|
||||
CCAFields []*CfgCdrField
|
||||
RequestFields []*FCTemplate
|
||||
ReplyFields []*FCTemplate
|
||||
}
|
||||
|
||||
func (self *DARequestProcessor) loadFromJsonCfg(jsnCfg *DARequestProcessorJsnCfg) error {
|
||||
func (dap *DARequestProcessor) loadFromJsonCfg(jsnCfg *DARequestProcessorJsnCfg) error {
|
||||
if jsnCfg == nil {
|
||||
return nil
|
||||
}
|
||||
if jsnCfg.Id != nil {
|
||||
self.Id = *jsnCfg.Id
|
||||
dap.Id = *jsnCfg.Id
|
||||
}
|
||||
if jsnCfg.Dry_run != nil {
|
||||
self.DryRun = *jsnCfg.Dry_run
|
||||
if jsnCfg.Tenant != nil {
|
||||
dap.Tenant = NewRSRParsersMustCompile(*jsnCfg.Tenant, true)
|
||||
}
|
||||
if jsnCfg.Thresholds_event != nil {
|
||||
self.ThresholdSEvent = *jsnCfg.Thresholds_event
|
||||
}
|
||||
var err error
|
||||
if jsnCfg.Request_filter != nil {
|
||||
if self.RequestFilter, err = utils.ParseRSRFields(*jsnCfg.Request_filter, utils.INFIELD_SEP); err != nil {
|
||||
return err
|
||||
if jsnCfg.Filters != nil {
|
||||
dap.Filters = make([]string, len(*jsnCfg.Filters))
|
||||
for i, fltr := range *jsnCfg.Filters {
|
||||
dap.Filters[i] = fltr
|
||||
}
|
||||
}
|
||||
if jsnCfg.Flags != nil {
|
||||
self.Flags = utils.StringMapFromSlice(*jsnCfg.Flags)
|
||||
dap.Flags = utils.StringMapFromSlice(*jsnCfg.Flags)
|
||||
}
|
||||
if jsnCfg.Timezone != nil {
|
||||
dap.Timezone = *jsnCfg.Timezone
|
||||
}
|
||||
if jsnCfg.Continue_on_success != nil {
|
||||
self.ContinueOnSuccess = *jsnCfg.Continue_on_success
|
||||
dap.ContinueOnSuccess = *jsnCfg.Continue_on_success
|
||||
}
|
||||
if jsnCfg.Append_cca != nil {
|
||||
self.AppendCCA = *jsnCfg.Append_cca
|
||||
if jsnCfg.Request_fields != nil {
|
||||
dap.RequestFields = FCTemplatesFromFCTemapltesJsonCfg(*jsnCfg.Request_fields)
|
||||
}
|
||||
if jsnCfg.CCR_fields != nil {
|
||||
if self.CCRFields, err = CfgCdrFieldsFromCdrFieldsJsonCfg(*jsnCfg.CCR_fields); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if jsnCfg.CCA_fields != nil {
|
||||
if self.CCAFields, err = CfgCdrFieldsFromCdrFieldsJsonCfg(*jsnCfg.CCA_fields); err != nil {
|
||||
return err
|
||||
}
|
||||
if jsnCfg.Reply_fields != nil {
|
||||
dap.ReplyFields = FCTemplatesFromFCTemapltesJsonCfg(*jsnCfg.Reply_fields)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -327,48 +327,41 @@ type OsipsConnJsonCfg struct {
|
||||
|
||||
// DiameterAgent configuration
|
||||
type DiameterAgentJsonCfg struct {
|
||||
Enabled *bool // enables the diameter agent: <true|false>
|
||||
Listen *string // address where to listen for diameter requests <x.y.z.y:1234>
|
||||
Dictionaries_dir *string // path towards additional dictionaries
|
||||
Sessions_conns *[]*HaPoolJsonCfg // Connections towards generic SM
|
||||
Thresholds_conns *[]*HaPoolJsonCfg // connection towards pubsubs
|
||||
Create_cdr *bool
|
||||
Cdr_requires_session *bool
|
||||
Debit_interval *string
|
||||
Timezone *string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
Origin_host *string
|
||||
Origin_realm *string
|
||||
Vendor_id *int
|
||||
Product_name *string
|
||||
Request_processors *[]*DARequestProcessorJsnCfg
|
||||
Enabled *bool // enables the diameter agent: <true|false>
|
||||
Listen *string // address where to listen for diameter requests <x.y.z.y:1234>
|
||||
Dictionaries_path *string // path towards additional dictionaries
|
||||
Sessions_conns *[]*HaPoolJsonCfg // Connections towards SessionS
|
||||
Origin_host *string
|
||||
Origin_realm *string
|
||||
Vendor_id *int
|
||||
Product_name *string
|
||||
Request_processors *[]*DARequestProcessorJsnCfg
|
||||
}
|
||||
|
||||
// One Diameter request processor configuration
|
||||
type DARequestProcessorJsnCfg struct {
|
||||
Id *string
|
||||
Dry_run *bool
|
||||
Thresholds_event *bool
|
||||
Request_filter *string
|
||||
Tenant *string
|
||||
Filters *[]string
|
||||
Flags *[]string
|
||||
Timezone *string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
Continue_on_success *bool
|
||||
Append_cca *bool
|
||||
CCR_fields *[]*CdrFieldJsonCfg
|
||||
CCA_fields *[]*CdrFieldJsonCfg
|
||||
Request_fields *[]*FcTemplateJsonCfg
|
||||
Reply_fields *[]*FcTemplateJsonCfg
|
||||
}
|
||||
|
||||
// Radius Agent configuration section
|
||||
type RadiusAgentJsonCfg struct {
|
||||
Enabled *bool
|
||||
Listen_net *string
|
||||
Listen_auth *string
|
||||
Listen_acct *string
|
||||
Client_secrets *map[string]string
|
||||
Client_dictionaries *map[string]string
|
||||
Sessions_conns *[]*HaPoolJsonCfg
|
||||
Tenant *string
|
||||
Cdr_requires_session *bool
|
||||
Timezone *string
|
||||
Request_processors *[]*RAReqProcessorJsnCfg
|
||||
Enabled *bool
|
||||
Listen_net *string
|
||||
Listen_auth *string
|
||||
Listen_acct *string
|
||||
Client_secrets *map[string]string
|
||||
Client_dictionaries *map[string]string
|
||||
Sessions_conns *[]*HaPoolJsonCfg
|
||||
Tenant *string
|
||||
Timezone *string
|
||||
Request_processors *[]*RAReqProcessorJsnCfg
|
||||
}
|
||||
|
||||
type RAReqProcessorJsnCfg struct {
|
||||
|
||||
@@ -31,7 +31,6 @@ type RadiusAgentCfg struct {
|
||||
ClientDictionaries map[string]string
|
||||
SessionSConns []*HaPoolConfig
|
||||
Tenant RSRParsers
|
||||
CDRRequiresSession bool
|
||||
Timezone string
|
||||
RequestProcessors []*RARequestProcessor
|
||||
}
|
||||
@@ -78,9 +77,6 @@ func (self *RadiusAgentCfg) loadFromJsonCfg(jsnCfg *RadiusAgentJsonCfg) (err err
|
||||
if jsnCfg.Tenant != nil {
|
||||
self.Tenant = NewRSRParsersMustCompile(*jsnCfg.Tenant, true)
|
||||
}
|
||||
if jsnCfg.Cdr_requires_session != nil {
|
||||
self.CDRRequiresSession = *jsnCfg.Cdr_requires_session
|
||||
}
|
||||
if jsnCfg.Timezone != nil {
|
||||
self.Timezone = *jsnCfg.Timezone
|
||||
}
|
||||
|
||||
@@ -1898,7 +1898,7 @@ func (smg *SMGeneric) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection,
|
||||
rply.MaxUsage = &maxUsage
|
||||
}
|
||||
}
|
||||
if smg.thdS != nil && args.ProcessThresholds {
|
||||
if args.ProcessThresholds {
|
||||
if smg.thdS == nil {
|
||||
return utils.NewErrNotConnected(utils.ThresholdS)
|
||||
}
|
||||
@@ -1913,7 +1913,7 @@ func (smg *SMGeneric) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection,
|
||||
}
|
||||
rply.ThresholdIDs = &tIDs
|
||||
}
|
||||
if smg.statS != nil && args.ProcessStats {
|
||||
if args.ProcessStats {
|
||||
if smg.statS == nil {
|
||||
return utils.NewErrNotConnected(utils.StatService)
|
||||
}
|
||||
@@ -2120,7 +2120,7 @@ func (smg *SMGeneric) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection
|
||||
return utils.NewErrResourceS(err)
|
||||
}
|
||||
}
|
||||
if smg.thdS != nil && args.ProcessThresholds {
|
||||
if args.ProcessThresholds {
|
||||
if smg.thdS == nil {
|
||||
return utils.NewErrNotConnected(utils.ThresholdS)
|
||||
}
|
||||
@@ -2134,7 +2134,10 @@ func (smg *SMGeneric) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection
|
||||
fmt.Sprintf("<SessionS> error: %s processing event %+v with ThresholdS.", err.Error(), thEv))
|
||||
}
|
||||
}
|
||||
if smg.statS != nil && args.ProcessStats {
|
||||
if args.ProcessStats {
|
||||
if smg.statS == nil {
|
||||
return utils.NewErrNotConnected(utils.StatS)
|
||||
}
|
||||
var statReply []string
|
||||
if err := smg.statS.Call(utils.StatSv1ProcessEvent, &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent}, &statReply); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
@@ -2153,20 +2156,25 @@ func (smg *SMGeneric) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection,
|
||||
return smg.cdrsrv.Call(utils.CdrsV2ProcessCDR, cgrEv, reply)
|
||||
}
|
||||
|
||||
func NewV1ProcessEventArgs(resrc, acnts, attrs bool,
|
||||
func NewV1ProcessEventArgs(resrc, acnts, attrs, thds, stats bool,
|
||||
cgrEv utils.CGREvent) *V1ProcessEventArgs {
|
||||
return &V1ProcessEventArgs{
|
||||
AllocateResources: resrc,
|
||||
Debit: acnts,
|
||||
GetAttributes: attrs,
|
||||
ProcessThresholds: thds,
|
||||
ProcessStats: stats,
|
||||
CGREvent: cgrEv,
|
||||
}
|
||||
}
|
||||
|
||||
type V1ProcessEventArgs struct {
|
||||
GetAttributes bool
|
||||
AllocateResources bool
|
||||
Debit bool
|
||||
GetAttributes bool
|
||||
ProcessThresholds bool
|
||||
ProcessStats bool
|
||||
|
||||
utils.CGREvent
|
||||
}
|
||||
|
||||
@@ -2207,6 +2215,25 @@ func (smg *SMGeneric) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection,
|
||||
if args.CGREvent.ID == "" {
|
||||
args.CGREvent.ID = utils.GenUUID()
|
||||
}
|
||||
if args.GetAttributes {
|
||||
if smg.attrS == nil {
|
||||
return utils.NewErrNotConnected(utils.AttributeS)
|
||||
}
|
||||
if args.CGREvent.Context == nil { // populate if not already in
|
||||
args.CGREvent.Context = utils.StringPointer(utils.MetaSessionS)
|
||||
}
|
||||
attrArgs := &engine.AttrArgsProcessEvent{
|
||||
CGREvent: args.CGREvent,
|
||||
}
|
||||
var rplyEv engine.AttrSProcessEventReply
|
||||
if err := smg.attrS.Call(utils.AttributeSv1ProcessEvent,
|
||||
attrArgs, &rplyEv); err == nil {
|
||||
args.CGREvent = *rplyEv.CGREvent
|
||||
rply.Attributes = &rplyEv
|
||||
} else if err.Error() != utils.ErrNotFound.Error() {
|
||||
return utils.NewErrAttributeS(err)
|
||||
}
|
||||
}
|
||||
if args.AllocateResources {
|
||||
if smg.resS == nil {
|
||||
return utils.NewErrNotConnected(utils.ResourceS)
|
||||
@@ -2238,22 +2265,30 @@ func (smg *SMGeneric) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection,
|
||||
rply.MaxUsage = &maxUsage
|
||||
}
|
||||
}
|
||||
if args.GetAttributes {
|
||||
if smg.attrS == nil {
|
||||
return utils.NewErrNotConnected(utils.AttributeS)
|
||||
if args.ProcessThresholds {
|
||||
if smg.thdS == nil {
|
||||
return utils.NewErrNotConnected(utils.ThresholdS)
|
||||
}
|
||||
if args.CGREvent.Context == nil {
|
||||
args.CGREvent.Context = utils.StringPointer(utils.MetaSessionS)
|
||||
}
|
||||
attrArgs := &engine.AttrArgsProcessEvent{
|
||||
var tIDs []string
|
||||
thEv := &engine.ArgsProcessEvent{
|
||||
CGREvent: args.CGREvent,
|
||||
}
|
||||
var rplyEv engine.AttrSProcessEventReply
|
||||
if err = smg.attrS.Call(utils.AttributeSv1ProcessEvent,
|
||||
attrArgs, &rplyEv); err != nil {
|
||||
return utils.NewErrAttributeS(err)
|
||||
if err := smg.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<SessionS> error: %s processing event %+v with ThresholdS.", err.Error(), thEv))
|
||||
}
|
||||
}
|
||||
if args.ProcessStats {
|
||||
if smg.statS == nil {
|
||||
return utils.NewErrNotConnected(utils.StatS)
|
||||
}
|
||||
var statReply []string
|
||||
if err := smg.statS.Call(utils.StatSv1ProcessEvent, &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent}, &statReply); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<SessionS> error: %s processing event %+v with StatS.", err.Error(), args.CGREvent))
|
||||
}
|
||||
rply.Attributes = &rplyEv
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user