Diameter event publishing, fixes #263

This commit is contained in:
DanB
2016-01-24 16:04:10 +01:00
parent cc5ec5177c
commit 5b828ec98f
9 changed files with 166 additions and 31 deletions

View File

@@ -30,8 +30,8 @@ import (
"github.com/fiorix/go-diameter/diam/sm"
)
func NewDiameterAgent(cgrCfg *config.CGRConfig, smg *rpcclient.RpcClient) (*DiameterAgent, error) {
da := &DiameterAgent{cgrCfg: cgrCfg, smg: smg}
func NewDiameterAgent(cgrCfg *config.CGRConfig, smg *rpcclient.RpcClient, pubsubs *rpcclient.RpcClient) (*DiameterAgent, error) {
da := &DiameterAgent{cgrCfg: cgrCfg, smg: smg, pubsubs: pubsubs}
dictsDir := cgrCfg.DiameterAgentCfg().DictionariesDir
if len(dictsDir) != 0 {
if err := loadDictionaries(dictsDir, "DiameterAgent"); err != nil {
@@ -42,8 +42,9 @@ func NewDiameterAgent(cgrCfg *config.CGRConfig, smg *rpcclient.RpcClient) (*Diam
}
type DiameterAgent struct {
cgrCfg *config.CGRConfig
smg *rpcclient.RpcClient // Connection towards CGR-SMG component
cgrCfg *config.CGRConfig
smg *rpcclient.RpcClient // Connection towards CGR-SMG component
pubsubs *rpcclient.RpcClient // Connection towards CGR-PubSub component
}
// Creates the message handlers
@@ -91,6 +92,18 @@ func (self DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestPro
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> Processing message: %+v AsSMGenericEvent, error: %s", ccr.diamMessage, err))
return cca
}
if reqProcessor.PublishEvent && self.pubsubs != nil {
evt, err := smgEv.AsMapStringString()
if err != nil {
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> Processing message: %+v failed converting SMGEvent to pubsub one, error: %s", ccr.diamMessage, err))
return nil
}
var reply string
if err := self.pubsubs.Call("PubSubV1.Publish", evt, reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> Processing message: %+v failed publishing event, error: %s", ccr.diamMessage, err))
return nil
}
}
var maxUsage float64
if reqProcessor.DryRun { // DryRun does not send over network
utils.Logger.Info(fmt.Sprintf("<DiameterAgent> SMGenericEvent: %+v", smgEv))

View File

@@ -196,9 +196,9 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal
server.BijsonRegisterOnDisconnect(smg_econns.OnClientDisconnect)
}
func startDiameterAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS DiameterAgent service.")
var smgConn *rpcclient.RpcClient
var smgConn, pubsubConn *rpcclient.RpcClient
var err error
if cfg.DiameterAgentCfg().SMGeneric == utils.INTERNAL {
smgRpc := <-internalSMGChan
@@ -212,7 +212,19 @@ func startDiameterAgent(internalSMGChan chan rpcclient.RpcClientConnection, exit
exitChan <- true
return
}
da, err := agents.NewDiameterAgent(cfg, smgConn)
if cfg.DiameterAgentCfg().PubSubS == utils.INTERNAL {
pubSubRpc := <-internalPubSubSChan
internalPubSubSChan <- pubSubRpc
pubsubConn, err = rpcclient.NewRpcClient("", "", 0, 0, rpcclient.INTERNAL_RPC, pubSubRpc)
} else if len(cfg.DiameterAgentCfg().PubSubS) != 0 {
pubsubConn, err = rpcclient.NewRpcClient("tcp", cfg.DiameterAgentCfg().PubSubS, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
}
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<DiameterAgent> Could not connect to PubSubS: %s", err.Error()))
exitChan <- true
return
}
da, err := agents.NewDiameterAgent(cfg, smgConn, pubsubConn)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> error: %s!", err))
exitChan <- true
@@ -372,7 +384,7 @@ func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS
}
func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, cdrDb engine.CdrStorage,
internalRaterChan chan *engine.Responder, internalPubSubSChan chan engine.PublisherSubscriber,
internalRaterChan chan *engine.Responder, internalPubSubSChan chan rpcclient.RpcClientConnection,
internalUserSChan chan engine.UserService, internalAliaseSChan chan engine.AliasService,
internalCdrStatSChan chan engine.StatsInterface, server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS CDRS service.")
@@ -394,23 +406,19 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage,
raterConn = &engine.RPCClientConnector{Client: client}
}
// Pubsub connection init
var pubSubConn engine.PublisherSubscriber
var pubSubConn rpcclient.RpcClientConnection
if cfg.CDRSPubSub == utils.INTERNAL {
pubSubs := <-internalPubSubSChan
pubSubConn = pubSubs
internalPubSubSChan <- pubSubs
} else if len(cfg.CDRSPubSub) != 0 {
if cfg.CDRSRater == cfg.CDRSPubSub {
pubSubConn = &engine.ProxyPubSub{Client: client}
} else {
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSPubSub, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to pubsub server: %s", err.Error()))
exitChan <- true
return
}
pubSubConn = &engine.ProxyPubSub{Client: client}
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSPubSub, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to pubsub server: %s", err.Error()))
exitChan <- true
return
}
pubSubConn = client
}
// Users connection init
var usersConn engine.UserService
@@ -515,7 +523,7 @@ func startHistoryServer(internalHistorySChan chan history.Scribe, server *utils.
internalHistorySChan <- scribeServer
}
func startPubSubServer(internalPubSubSChan chan engine.PublisherSubscriber, accountDb engine.AccountingStorage, server *utils.Server) {
func startPubSubServer(internalPubSubSChan chan rpcclient.RpcClientConnection, accountDb engine.AccountingStorage, server *utils.Server) {
pubSubServer := engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify)
server.RpcRegisterName("PubSubV1", pubSubServer)
internalPubSubSChan <- pubSubServer
@@ -548,7 +556,7 @@ func startRpc(server *utils.Server, internalRaterChan chan *engine.Responder,
internalCdrSChan chan *engine.CdrServer,
internalCdrStatSChan chan engine.StatsInterface,
internalHistorySChan chan history.Scribe,
internalPubSubSChan chan engine.PublisherSubscriber,
internalPubSubSChan chan rpcclient.RpcClientConnection,
internalUserSChan chan engine.UserService,
internalAliaseSChan chan engine.AliasService) {
select { // Any of the rpc methods will unlock listening to rpc requests
@@ -674,7 +682,7 @@ func main() {
internalCdrSChan := make(chan *engine.CdrServer, 1)
internalCdrStatSChan := make(chan engine.StatsInterface, 1)
internalHistorySChan := make(chan history.Scribe, 1)
internalPubSubSChan := make(chan engine.PublisherSubscriber, 1)
internalPubSubSChan := make(chan rpcclient.RpcClientConnection, 1)
internalUserSChan := make(chan engine.UserService, 1)
internalAliaseSChan := make(chan engine.AliasService, 1)
internalSMGChan := make(chan rpcclient.RpcClientConnection, 1)
@@ -735,7 +743,7 @@ func main() {
}
if cfg.DiameterAgentCfg().Enabled {
go startDiameterAgent(internalSMGChan, exitChan)
go startDiameterAgent(internalSMGChan, internalPubSubSChan, exitChan)
}
// Start HistoryS service

View File

@@ -29,6 +29,7 @@ import (
"github.com/cgrates/cgrates/history"
"github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled *bool, exitChan chan bool) {
@@ -41,7 +42,7 @@ func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled
// Starts rater and reports on chan
func startRater(internalRaterChan chan *engine.Responder, cacheDoneChan chan struct{}, internalBalancerChan chan *balancer2go.Balancer, internalSchedulerChan chan *scheduler.Scheduler,
internalCdrStatSChan chan engine.StatsInterface, internalHistorySChan chan history.Scribe,
internalPubSubSChan chan engine.PublisherSubscriber, internalUserSChan chan engine.UserService, internalAliaseSChan chan engine.AliasService,
internalPubSubSChan chan rpcclient.RpcClientConnection, internalUserSChan chan engine.UserService, internalAliaseSChan chan engine.AliasService,
server *utils.Server,
ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, logDb engine.LogStorage,
stopHandled *bool, exitChan chan bool) {
@@ -163,7 +164,7 @@ func startRater(internalRaterChan chan *engine.Responder, cacheDoneChan chan str
waitTasks = append(waitTasks, pubsubTaskChan)
go func() {
defer close(pubsubTaskChan)
var pubSubServer engine.PublisherSubscriber
var pubSubServer rpcclient.RpcClientConnection
if cfg.RaterPubSubServer == utils.INTERNAL {
select {
case pubSubServer = <-internalPubSubSChan:
@@ -173,7 +174,7 @@ func startRater(internalRaterChan chan *engine.Responder, cacheDoneChan chan str
exitChan <- true
return
}
} else if pubSubServer, err = engine.NewProxyPubSub(cfg.RaterPubSubServer, cfg.ConnectAttempts, -1); err != nil {
} else if pubSubServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterPubSubServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
utils.Logger.Crit(fmt.Sprintf("<Rater> Could not connect to pubsubs: %s", err.Error()))
exitChan <- true
return

View File

@@ -30,6 +30,7 @@ import (
"github.com/cgrates/cgrates/cache2go"
"github.com/cgrates/cgrates/history"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
const (
@@ -76,7 +77,7 @@ var (
debitPeriod = 10 * time.Second
globalRoundingDecimals = 5
historyScribe history.Scribe
pubSubServer PublisherSubscriber
pubSubServer rpcclient.RpcClientConnection
userService UserService
aliasService AliasService
)
@@ -114,7 +115,7 @@ func SetHistoryScribe(scribe history.Scribe) {
historyScribe = scribe
}
func SetPubSub(ps PublisherSubscriber) {
func SetPubSub(ps rpcclient.RpcClientConnection) {
pubSubServer = ps
}
@@ -129,7 +130,7 @@ func SetAliasService(as AliasService) {
func Publish(event CgrEvent) {
if pubSubServer != nil {
var s string
pubSubServer.Publish(event, &s)
pubSubServer.Call("PubSubV1.Publish", event, &s)
}
}

View File

@@ -27,6 +27,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
"github.com/jinzhu/gorm"
mgov2 "gopkg.in/mgo.v2"
)
@@ -66,7 +67,7 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
}
}
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, pubsub PublisherSubscriber, users UserService, aliases AliasService, stats StatsInterface) (*CdrServer, error) {
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, pubsub rpcclient.RpcClientConnection, users UserService, aliases AliasService, stats StatsInterface) (*CdrServer, error) {
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: &GuardianLock{locksMap: make(map[string]chan bool)}}, nil
}
@@ -74,7 +75,7 @@ type CdrServer struct {
cgrCfg *config.CGRConfig
cdrDb CdrStorage
rater Connector
pubsub PublisherSubscriber
pubsub rpcclient.RpcClientConnection
users UserService
aliases AliasService
stats StatsInterface

View File

@@ -165,6 +165,53 @@ func (ps *PubSub) ShowSubscribers(in string, out *map[string]*SubscriberData) er
return nil
}
// rpcclient.RpcClientConnection interface
func (ps *PubSub) Call(serviceMethod string, args interface{}, reply interface{}) error {
switch serviceMethod {
case "PubSubV1.Subscribe":
argsConverted, canConvert := args.(SubscribeInfo)
if !canConvert {
return rpcclient.ErrWrongArgsType
}
replyConverted, canConvert := reply.(*string)
if !canConvert {
return rpcclient.ErrWrongReplyType
}
return ps.Subscribe(argsConverted, replyConverted)
case "PubSubV1.Unsubscribe":
argsConverted, canConvert := args.(SubscribeInfo)
if !canConvert {
return rpcclient.ErrWrongArgsType
}
replyConverted, canConvert := reply.(*string)
if !canConvert {
return rpcclient.ErrWrongReplyType
}
return ps.Unsubscribe(argsConverted, replyConverted)
case "PubSubV1.Publish":
argsConverted, canConvert := args.(CgrEvent)
if !canConvert {
return rpcclient.ErrWrongArgsType
}
replyConverted, canConvert := reply.(*string)
if !canConvert {
return rpcclient.ErrWrongReplyType
}
return ps.Publish(argsConverted, replyConverted)
case "PubSubV1.ShowSubscribers":
argsConverted, canConvert := args.(string)
if !canConvert {
return rpcclient.ErrWrongArgsType
}
replyConverted, canConvert := reply.(*map[string]*SubscriberData)
if !canConvert {
return rpcclient.ErrWrongReplyType
}
return ps.ShowSubscribers(argsConverted, replyConverted)
}
return rpcclient.ErrUnsupporteServiceMethod
}
type ProxyPubSub struct {
Client *rpcclient.RpcClient
}

View File

@@ -20,6 +20,7 @@ package sessionmanager
import (
"encoding/json"
"fmt"
"strconv"
"time"
@@ -348,3 +349,16 @@ func (self SMGenericEvent) AsLcrRequest() *engine.LcrRequest {
Duration: usageStr,
}
}
// AsMapStringString Converts into map[string]string, used for example as pubsub event
func (self SMGenericEvent) AsMapStringString() (map[string]string, error) {
mp := make(map[string]string)
for k, v := range self {
if strV, casts := utils.CastIfToString(v); !casts {
return nil, fmt.Errorf("Value %+v does not cast to string", v)
} else {
mp[k] = strV
}
}
return mp, nil
}

View File

@@ -474,3 +474,32 @@ func FmtFieldWidth(source string, width int, strip, padding string, mandatory bo
}
return source, nil
}
// Returns the string representation of iface or error if not convertible
func CastIfToString(iface interface{}) (strVal string, casts bool) {
switch iface.(type) {
case string:
strVal = iface.(string)
casts = true
case int:
strVal = strconv.Itoa(iface.(int))
casts = true
case int64:
strVal = strconv.FormatInt(iface.(int64), 10)
casts = true
case float64:
strVal = strconv.FormatFloat(iface.(float64), 'f', -1, 64)
casts = true
case bool:
strVal = strconv.FormatBool(iface.(bool))
casts = true
case []uint8:
var byteVal []byte
if byteVal, casts = iface.([]byte); casts {
strVal = string(byteVal)
}
default: // Maybe we are lucky and the value converts to string
strVal, casts = iface.(string)
}
return strVal, casts
}

View File

@@ -579,3 +579,24 @@ func TestPaddingNotAllowed(t *testing.T) {
t.Error("Expected error")
}
}
func TestCastIfToString(t *testing.T) {
v := interface{}("somestr")
if sOut, casts := CastIfToString(v); !casts {
t.Error("Does not cast")
} else if sOut != "somestr" {
t.Errorf("Received: %+v", sOut)
}
v = interface{}(1)
if sOut, casts := CastIfToString(v); !casts {
t.Error("Does not cast")
} else if sOut != "1" {
t.Errorf("Received: %+v", sOut)
}
v = interface{}(1.2)
if sOut, casts := CastIfToString(v); !casts {
t.Error("Does not cast")
} else if sOut != "1.2" {
t.Errorf("Received: %+v", sOut)
}
}