New agents package, ListenAndServe for DiameterAgent, integration tests, adding fiorix/go-diameter in the list of dependencies

This commit is contained in:
DanB
2015-11-14 18:56:14 +01:00
parent 72a51fe1d9
commit 8af6f61bb5
13 changed files with 327 additions and 23 deletions

63
agents/dmtagent.go Normal file
View File

@@ -0,0 +1,63 @@
/*
Real-time Charging System for Telecom & ISP environments
Copyright (C) 2012-2015 ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
import (
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
"github.com/fiorix/go-diameter/diam"
//"github.com/fiorix/go-diameter/diam/avp"
"github.com/fiorix/go-diameter/diam/datatype"
//"github.com/fiorix/go-diameter/diam/dict"
"github.com/fiorix/go-diameter/diam/sm"
)
func NewDiameterAgent(cgrCfg *config.CGRConfig, smg *rpcclient.RpcClient) *DiameterAgent {
return &DiameterAgent{cgrCfg: cgrCfg, smg: smg}
}
type DiameterAgent struct {
cgrCfg *config.CGRConfig
smg *rpcclient.RpcClient // Connection towards CGR-SMG component
}
// Creates the message handlers
func (self *DiameterAgent) handlers() diam.Handler {
settings := &sm.Settings{
OriginHost: datatype.DiameterIdentity(self.cgrCfg.DiameterAgentCfg().OriginHost),
OriginRealm: datatype.DiameterIdentity(self.cgrCfg.DiameterAgentCfg().OriginRealm),
VendorID: datatype.Unsigned32(self.cgrCfg.DiameterAgentCfg().VendorId),
ProductName: datatype.UTF8String(self.cgrCfg.DiameterAgentCfg().ProductName),
FirmwareRevision: datatype.Unsigned32(utils.DIAMETER_FIRMWARE_REVISION),
}
dSM := sm.New(settings)
dSM.HandleFunc("ALL", self.handleALL)
return dSM
}
func (self *DiameterAgent) handleALL(c diam.Conn, m *diam.Message) {
utils.Logger.Warning(fmt.Sprintf("<DiameterAgent> Received unexpected message from %s:\n%s", c.RemoteAddr(), m))
}
func (self *DiameterAgent) ListenAndServe() error {
return diam.ListenAndServe(self.cgrCfg.DiameterAgentCfg().Listen, self.handlers(), nil)
}

View File

@@ -0,0 +1,89 @@
/*
Real-time Charging System for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can Storagetribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITH*out ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
import (
"flag"
"path"
"testing"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
)
var testIntegration = flag.Bool("integration", false, "Perform the tests in integration mode, not by default.") // This flag will be passed here via "go test -local" args
var waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache")
var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
var daCfgPath string
var daCfg *config.CGRConfig
func TestDmtAgentInitCfg(t *testing.T) {
if !*testIntegration {
return
}
daCfgPath = path.Join(*dataDir, "conf", "samples", "dmtagent")
// Init config first
var err error
daCfg, err = config.NewCGRConfigFromFolder(daCfgPath)
if err != nil {
t.Error(err)
}
daCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush()
config.SetCgrConfig(daCfg)
}
// Remove data in both rating and accounting db
func TestDmtAgentResetDataDb(t *testing.T) {
if !*testIntegration {
return
}
if err := engine.InitDataDb(daCfg); err != nil {
t.Fatal(err)
}
}
// Wipe out the cdr database
func TestDmtAgentResetStorDb(t *testing.T) {
if !*testIntegration {
return
}
if err := engine.InitStorDb(daCfg); err != nil {
t.Fatal(err)
}
}
// Start CGR Engine
func TestDmtAgentStartEngine(t *testing.T) {
if !*testIntegration {
return
}
if _, err := engine.StopStartEngine(daCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
func TestDmtAgentStopEngine(t *testing.T) {
if !*testIntegration {
return
}
if err := engine.KillEngine(*waitRater); err != nil {
t.Error(err)
}
}

View File

@@ -29,6 +29,7 @@ import (
"strconv"
"time"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/apier/v2"
"github.com/cgrates/cgrates/balancer2go"
@@ -135,7 +136,7 @@ func startCdrc(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan *
}
}
func startSmGeneric(internalRaterChan chan *engine.Responder, server *utils.Server, exitChan chan bool) {
func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internalRaterChan chan *engine.Responder, server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SM-Generic service.")
var raterConn, cdrsConn engine.Connector
var client *rpcclient.RpcClient
@@ -184,6 +185,7 @@ func startSmGeneric(internalRaterChan chan *engine.Responder, server *utils.Serv
// Register RPC handler
smgRpc := v1.NewSMGenericV1(sm)
server.RpcRegister(smgRpc)
internalSMGChan <- smgRpc
// Register BiRpc handlers
smgBiRpc := v1.NewSMGenericBiRpcV1(sm)
for method, handler := range smgBiRpc.Handlers() {
@@ -194,6 +196,29 @@ func startSmGeneric(internalRaterChan chan *engine.Responder, server *utils.Serv
server.BijsonRegisterOnDisconnect(smg_econns.OnClientDisconnect)
}
func startDiameterAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS DiameterAgent service.")
var smgConn *rpcclient.RpcClient
var err error
if cfg.DiameterAgentCfg().SMGeneric == utils.INTERNAL {
smgRpc := <-internalSMGChan
internalSMGChan <- smgRpc
smgConn, err = rpcclient.NewRpcClient("", "", 0, 0, rpcclient.INTERNAL_RPC, smgRpc)
} else {
smgConn, err = rpcclient.NewRpcClient("tcp", cfg.DiameterAgentCfg().SMGeneric, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
}
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<DiameterAgent> Could not connect to SMG: %s", err.Error()))
exitChan <- true
return
}
da := agents.NewDiameterAgent(cfg, smgConn)
if err = da.ListenAndServe(); err != nil {
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> error: %s!", err))
}
exitChan <- true
}
func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SM-FreeSWITCH service.")
var raterConn, cdrsConn engine.Connector
@@ -643,7 +668,7 @@ func main() {
internalPubSubSChan := make(chan engine.PublisherSubscriber, 1)
internalUserSChan := make(chan engine.UserService, 1)
internalAliaseSChan := make(chan engine.AliasService, 1)
internalSMGChan := make(chan rpcclient.RpcClientConnection, 1)
// Start balancer service
if cfg.BalancerEnabled {
go startBalancer(internalBalancerChan, &stopHandled, exitChan) // Not really needed async here but to cope with uniformity
@@ -675,7 +700,7 @@ func main() {
// Start SM-Generic
if cfg.SmGenericConfig.Enabled {
go startSmGeneric(internalRaterChan, server, exitChan)
go startSmGeneric(internalSMGChan, internalRaterChan, server, exitChan)
}
// Start SM-FreeSWITCH
if cfg.SmFsConfig.Enabled {
@@ -700,6 +725,10 @@ func main() {
server.RpcRegister(smRpc)
}
if cfg.DiameterAgentCfg().Enabled {
go startDiameterAgent(internalSMGChan, exitChan)
}
// Start HistoryS service
if cfg.HistoryServerEnabled {
go startHistoryServer(internalHistorySChan, server, exitChan)

View File

@@ -65,7 +65,7 @@ func NewDefaultCGRConfig() (*CGRConfig, error) {
cfg.SmFsConfig = new(SmFsConfig)
cfg.SmKamConfig = new(SmKamConfig)
cfg.SmOsipsConfig = new(SmOsipsConfig)
cfg.DiameterAgentCfg = new(DiameterAgentCfg)
cfg.diameterAgentCfg = new(DiameterAgentCfg)
cfg.ConfigReloads = make(map[string]chan struct{})
cfg.ConfigReloads[utils.CDRC] = make(chan struct{}, 1)
cfg.ConfigReloads[utils.CDRC] <- struct{}{} // Unlock the channel
@@ -232,7 +232,7 @@ type CGRConfig struct {
SmFsConfig *SmFsConfig // SM-FreeSWITCH configuration
SmKamConfig *SmKamConfig // SM-Kamailio Configuration
SmOsipsConfig *SmOsipsConfig // SM-OpenSIPS Configuration
DiameterAgentCfg *DiameterAgentCfg // DiameterAgent configuration
diameterAgentCfg *DiameterAgentCfg // DiameterAgent configuration
HistoryServer string // Address where to reach the master history server: <internal|x.y.z.y:1234>
HistoryServerEnabled bool // Starts History as server: <true|false>.
HistoryDir string // Location on disk where to store history files.
@@ -379,8 +379,8 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
// DAgent checks
if self.DiameterAgentCfg.Enabled {
if self.DiameterAgentCfg.SMGeneric == utils.INTERNAL && !self.SmGenericConfig.Enabled {
if self.diameterAgentCfg.Enabled {
if self.diameterAgentCfg.SMGeneric == utils.INTERNAL && !self.SmGenericConfig.Enabled {
return errors.New("SMGeneric not enabled but referenced by DiameterAgent component")
}
}
@@ -798,7 +798,7 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
}
if jsnDACfg != nil {
if err := self.DiameterAgentCfg.loadFromJsonCfg(jsnDACfg); err != nil {
if err := self.diameterAgentCfg.loadFromJsonCfg(jsnDACfg); err != nil {
return err
}
}
@@ -868,7 +868,12 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
// Use locking to retrieve the configuration, possibility later for runtime reload
func (self *CGRConfig) SureTaxCfg() *SureTaxCfg {
cfgChan := <-self.ConfigReloads[utils.SURETAX] // Lock config for read or reloads
stCfg := self.sureTaxCfg
self.ConfigReloads[utils.SURETAX] <- cfgChan // unlock config for reloads or read
return stCfg
defer func() { self.ConfigReloads[utils.SURETAX] <- cfgChan }()
return self.sureTaxCfg
}
func (self *CGRConfig) DiameterAgentCfg() *DiameterAgentCfg {
cfgChan := <-self.ConfigReloads[utils.DIAMETER_AGENT] // Lock config for read or reloads
defer func() { self.ConfigReloads[utils.DIAMETER_AGENT] <- cfgChan }()
return self.diameterAgentCfg
}

View File

@@ -203,8 +203,8 @@ const CGRATES_CFG_JSON = `
"listen_bijson": "127.0.0.1:2014", // address where to listen for bidirectional JSON-RPC requests
"rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013>
"cdrs": "internal", // address where to reach CDR Server <""|internal|x.y.z.y:1234>
"debit_interval": "10s", // interval to perform debits on.
"min_call_duration": "0s", // only authorize calls with allowed duration higher than this
"debit_interval": "0", // interval to perform debits on.
"min_call_duration": "0", // only authorize calls with allowed duration higher than this
"max_call_duration": "3h", // maximum call duration a prepaid call can last
},
@@ -262,8 +262,12 @@ const CGRATES_CFG_JSON = `
"diameter_agent": {
"enabled": false, // enables the diameter agent: <true|false>
"listen": "127.0.0.1:3868", // address where to listen for diameter requests <x.y.z.y:1234>
"sm_generic": "internal", // Connection towards SMG component for session management
"sm_generic": "internal", // connection towards SMG component for session management
"timezone": "", // timezone for timestamps where not specified, empty for general defaults <""|UTC|Local|$IANA_TZ_DB>
"origin_host": "diameter-agent", // diameter Origin-Host AVP used in replies
"origin_realm": "cgrates.org", // diameter Origin-Realm AVP used in replies
"vendor_id": 0, // diameter Vendor-Id AVP used in replies
"product_name": "CGRateS", // diameter Product-Name AVP used in replies
"request_processors": [
{
"id": "*default", // Identifier of this processor

View File

@@ -330,8 +330,8 @@ func TestSmGenericJsonCfg(t *testing.T) {
Listen_bijson: utils.StringPointer("127.0.0.1:2014"),
Rater: utils.StringPointer("internal"),
Cdrs: utils.StringPointer("internal"),
Debit_interval: utils.StringPointer("10s"),
Min_call_duration: utils.StringPointer("0s"),
Debit_interval: utils.StringPointer("0"),
Min_call_duration: utils.StringPointer("0"),
Max_call_duration: utils.StringPointer("3h"),
}
if cfg, err := dfCgrJsonCfg.SmGenericJsonCfg(); err != nil {
@@ -416,10 +416,14 @@ func TestSmOsipsJsonCfg(t *testing.T) {
func TestDiameterAgentJsonCfg(t *testing.T) {
eCfg := &DiameterAgentJsonCfg{
Enabled: utils.BoolPointer(false),
Listen: utils.StringPointer("127.0.0.1:3868"),
Sm_generic: utils.StringPointer("internal"),
Timezone: utils.StringPointer(""),
Enabled: utils.BoolPointer(false),
Listen: utils.StringPointer("127.0.0.1:3868"),
Sm_generic: utils.StringPointer("internal"),
Timezone: utils.StringPointer(""),
Origin_host: utils.StringPointer("diameter-agent"),
Origin_realm: utils.StringPointer("cgrates.org"),
Vendor_id: utils.IntPointer(0),
Product_name: utils.StringPointer("CGRateS"),
Request_processors: &[]*DARequestProcessorJsnCfg{
&DARequestProcessorJsnCfg{
Id: utils.StringPointer("*default"),

View File

@@ -29,6 +29,10 @@ type DiameterAgentCfg struct {
Listen string // address where to listen for diameter requests <x.y.z.y:1234>
SMGeneric string // connection towards SMG component
Timezone string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
OriginHost string
OriginRealm string
VendorId int
ProductName string
RequestProcessors []*DARequestProcessor
}
@@ -48,6 +52,18 @@ func (self *DiameterAgentCfg) loadFromJsonCfg(jsnCfg *DiameterAgentJsonCfg) erro
if jsnCfg.Timezone != nil {
self.Timezone = *jsnCfg.Timezone
}
if jsnCfg.Origin_host != nil {
self.OriginHost = *jsnCfg.Origin_host
}
if jsnCfg.Origin_realm != nil {
self.OriginRealm = *jsnCfg.Origin_realm
}
if jsnCfg.Vendor_id != nil {
self.VendorId = *jsnCfg.Vendor_id
}
if jsnCfg.Product_name != nil {
self.ProductName = *jsnCfg.Product_name
}
if jsnCfg.Request_processors != nil {
for _, reqProcJsn := range *jsnCfg.Request_processors {
rp := new(DARequestProcessor)

View File

@@ -241,6 +241,10 @@ type DiameterAgentJsonCfg struct {
Listen *string // address where to listen for diameter requests <x.y.z.y:1234>
Sm_generic *string // Connection towards generic SM
Timezone *string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
Origin_host *string
Origin_realm *string
Vendor_id *int
Product_name *string
Request_processors *[]*DARequestProcessorJsnCfg
}

View File

@@ -0,0 +1,82 @@
{
// CGRateS Configuration file
//
// Used for cgradmin
// Starts rater, scheduler
"listen": {
"rpc_json": ":2012", // RPC JSON listening address
"rpc_gob": ":2013", // RPC GOB listening address
"http": ":2080", // HTTP listening address
},
"rater": {
"enabled": true, // enable Rater service: <true|false>
"cdrstats": "internal", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234>
"pubsubs": "internal", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234>
"users": "internal", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234>
"aliases": "internal",
},
"scheduler": {
"enabled": true, // start Scheduler service: <true|false>
},
"cdrs": {
"enabled": true, // start the CDR Server service: <true|false>
"rater": "internal", // address where to reach the Rater for cost calculation, empty to disable functionality: <""|internal|x.y.z.y:1234>
"cdrstats": "internal", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234>
},
"cdrstats": {
"enabled": true, // starts the cdrstats service: <true|false>
},
"pubsubs": {
"enabled": true, // starts PubSub service: <true|false>.
},
"aliases": {
"enabled": true, // starts Aliases service: <true|false>.
},
"users": {
"enabled": true, // starts User service: <true|false>.
"indexes": ["Uuid"], // user profile field indexes
},
"sm_generic": {
"enabled": true, // starts SessionManager service: <true|false>
"rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013>
"cdrs": "internal", // address where to reach CDR Server <""|internal|x.y.z.y:1234>
},
"diameter_agent": {
"enabled": true, // enables the diameter agent: <true|false>
"listen": "127.0.0.1:3868", // address where to listen for diameter requests <x.y.z.y:1234>
"request_processors": [
{
"id": "*default", // Identifier of this processor
"dry_run": false, // do not send the CDRs to CDRS, just parse them
"request_filter": "Subscription-Id>Subscription-Type(0)", // filter requests processed by this processor
"continue_on_success": false, // continue to the next template if executed
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "tor", "cdr_field_id": "TOR", "type": "cdrfield", "value": "^*voice", "mandatory": true},
{"tag": "accid", "cdr_field_id": "AccId", "type": "cdrfield", "value": "Session-Id", "mandatory": true},
{"tag": "reqtype", "cdr_field_id": "ReqType", "type": "cdrfield", "value": "^*users", "mandatory": true},
{"tag": "direction", "cdr_field_id": "Direction", "type": "cdrfield", "value": "^*out", "mandatory": true},
{"tag": "tenant", "cdr_field_id": "Tenant", "type": "cdrfield", "value": "^*users", "mandatory": true},
{"tag": "category", "cdr_field_id": "Category", "type": "cdrfield", "value": "^call_;~Calling-Vlr-Number:s/^$/33000/;~Calling-Vlr-Number:s/^(\\d{5})/${1}/", "mandatory": true},
{"tag": "account", "cdr_field_id": "Account", "type": "cdrfield", "value": "^*users", "mandatory": true},
{"tag": "subject", "cdr_field_id": "Subject", "type": "cdrfield", "value": "^*users", "mandatory": true},
{"tag": "destination", "cdr_field_id": "Destination", "type": "cdrfield", "value": "Real-Called-Number", "mandatory": true},
{"tag": "setup_time", "cdr_field_id": "SetupTime", "type": "cdrfield", "value": "Event-Time", "mandatory": true},
{"tag": "answer_time", "cdr_field_id": "AnswerTime", "type": "cdrfield", "value": "Event-Time", "mandatory": true},
{"tag": "usage", "cdr_field_id": "Usage", "type": "cdrfield", "value": "CC-Time", "mandatory": true},
{"tag": "subscriber_id", "cdr_field_id": "SubscriberId", "type": "cdrfield", "value": "Subscription-Id>Subscription-Id-Data", "mandatory": true},
],
},
],
},
}

View File

@@ -19,3 +19,4 @@ import:
- package: github.com/cgrates/fsock
- package: github.com/cenkalti/rpc2
- package: github.com/cenkalti/hub
- package: github.com/fiorix/go-diameter

View File

@@ -5,7 +5,7 @@ import:
- package: github.com/jinzhu/gorm
ref: 611e613459953787a01c2afc82835aa0ba01a045
- package: golang.org/x/net
ref: c764672d0ee39ffd83cfcb375804d3181302b62b
ref: 1d9fd3b8333e891c0e7353e1adcfe8a612573033
- package: github.com/DisposaBoy/JsonConfigReader
ref: 33a99fdf1d5ee1f79b5077e9c06f955ad356d5f4
- package: github.com/hoisie/redis
@@ -25,7 +25,7 @@ import:
- package: gopkg.in/fsnotify.v1
ref: 7be54206639f256967dd82fa767397ba5f8f48f5
- package: github.com/peterh/liner
ref: 32e535aff4145c12d1e154754ab144b49ab578e2
ref: 506a2f061bd0b362a8a2fa445be80386f39dedd8
- package: github.com/cgrates/rpcclient
ref: 028c43fc34d32dc9095c7605e2e455e0c7a5ea69
- package: github.com/cgrates/osipsdagram
@@ -38,3 +38,5 @@ import:
ref: 2d1be381ce47537e9e076b2b76dc70933162e4e9
- package: github.com/cenkalti/hub
ref: 57d753b5f4856e77b3cf8ecce78c97215a7d324d
- package: github.com/fiorix/go-diameter
ref: b4c1bac20b8e8e1ac7e17fb54dc83b155aacba21

View File

@@ -11,6 +11,7 @@ go test -i github.com/cgrates/cgrates/cdrc
go test -i github.com/cgrates/cgrates/utils
go test -i github.com/cgrates/cgrates/history
go test -i github.com/cgrates/cgrates/cdre
go test -i github.com/cgrates/cgrates/agents
go test github.com/cgrates/cgrates/apier/v1
v1=$?
@@ -38,5 +39,8 @@ go test github.com/cgrates/cgrates/cache2go
c2g=$?
go test github.com/cgrates/cgrates/cdre
cdre=$?
go test github.com/cgrates/cgrates/agents
ag=$?
exit $v1 && $v2 && $en && $gt && $sm && $cfg && $bl && $cr && $con && $cdrc && $ut && $hs && $c2g && $cdre
exit $v1 && $v2 && $en && $gt && $sm && $cfg && $bl && $cr && $con && $cdrc && $ut && $hs && $c2g && $cdre && $ag

View File

@@ -30,6 +30,7 @@ var (
const (
VERSION = "0.9.1~rc8"
DIAMETER_FIRMWARE_REVISION = 918
POSTGRES = "postgres"
MYSQL = "mysql"
MONGO = "mongo"