added pubsub configs

This commit is contained in:
Radu Ioan Fericean
2015-07-01 19:25:47 +03:00
parent 66ac4b194c
commit 02dc95d815
10 changed files with 200 additions and 22 deletions

View File

@@ -35,6 +35,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/history"
"github.com/cgrates/cgrates/pubsub"
"github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/sessionmanager"
"github.com/cgrates/cgrates/utils"
@@ -67,6 +68,7 @@ var (
exitChan = make(chan bool)
server = &engine.Server{}
scribeServer history.Scribe
pubSubServer pubsub.PublisherSubscriber
cdrServer *engine.CdrServer
cdrStats *engine.Stats
cfg *config.CGRConfig
@@ -363,6 +365,46 @@ func startHistoryAgent(chanServerStarted chan struct{}) {
return
}
func startPubSubServer(chanDone chan struct{}) {
if pubSubServer = pubsub.NewPubSub(cfg.HttpSkipTlsVerify); err != nil {
engine.Logger.Crit(fmt.Sprintf("<PubSubServer> Could not start, error: %s", err.Error()))
exitChan <- true
return
}
server.RpcRegisterName("PubSub", pubSubServer)
close(chanDone)
}
// chanStartServer will report when server is up, useful for internal requests
func startPubSubAgent(chanServerStarted chan struct{}) {
if cfg.PubSubServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting
engine.Logger.Crit(fmt.Sprintf("<PubSubAgent> Connecting internally to PubSubServer"))
select {
case <-time.After(1 * time.Minute):
engine.Logger.Crit(fmt.Sprintf("<PubSubAgent> Timeout waiting for server to start."))
exitChan <- true
return
case <-chanServerStarted:
}
//<-chanServerStarted // If server is not enabled, will have deadlock here
} else { // Connect in iteration since there are chances of concurrency here
delay := utils.Fib()
for i := 0; i < 3; i++ { //ToDo: Make it globally configurable
//engine.Logger.Crit(fmt.Sprintf("<PubSubAgent> Trying to connect, iteration: %d, time %s", i, time.Now()))
if pubSubServer = pubsub.NewPubSub(cfg.HttpSkipTlsVerify); err == nil {
break //Connected so no need to reiterate
} else if i == 2 && err != nil {
engine.Logger.Crit(fmt.Sprintf("<PubSubAgent> Could not connect to the server, error: %s", err.Error()))
exitChan <- true
return
}
time.Sleep(delay())
}
}
engine.SetPubSub(pubSubServer) // scribeServer comes from global variable
return
}
// Starts the rpc server, waiting for the necessary components to finish their tasks
func serveRpc(rpcWaitChans []chan struct{}) {
for _, chn := range rpcWaitChans {
@@ -579,6 +621,18 @@ func main() {
go startHistoryAgent(histServChan)
}
var pubsubServChan chan struct{} // Will be initialized only if the server starts
if cfg.PubSubServerEnabled {
pubsubServChan = make(chan struct{})
rpcWait = append(rpcWait, pubsubServChan)
go startPubSubServer(pubsubServChan)
}
if cfg.PubSubAgentEnabled {
engine.Logger.Info("Starting CGRateS PubSub Agent.")
go startPubSubAgent(pubsubServChan)
}
var cdrsChan chan struct{}
if cfg.CDRSEnabled {
engine.Logger.Info("Starting CGRateS CDRS service.")

View File

@@ -213,7 +213,11 @@ type CGRConfig struct {
HistoryServer string // Address where to reach the master history server: <internal|x.y.z.y:1234>
HistoryServerEnabled bool // Starts History as server: <true|false>.
HistoryDir string // Location on disk where to store history files.
HistorySaveInterval time.Duration // The timout duration between history writes
HistorySaveInterval time.Duration // The timout duration between pubsub writes
PubSubAgentEnabled bool // Starts PubSub as an agent: <true|false>.
PubSubServer string // Address where to reach the master pubsub server: <internal|x.y.z.y:1234>
PubSubServerEnabled bool // Starts PubSub as server: <true|false>.
PubSubSaveInterval time.Duration // The timout duration between pubsub writes
MailerServer string // The server to use when sending emails out
MailerAuthUser string // Authenticate to email server using this user
MailerAuthPass string // Authenticate to email server with this password
@@ -319,6 +323,10 @@ func (self *CGRConfig) checkConfigSanity() error {
if self.HistoryAgentEnabled && !self.HistoryServerEnabled {
return errors.New("HistoryServer not enabled but referenced by HistoryAgent component")
}
// PubSubAgent
if self.PubSubAgentEnabled && !self.PubSubServerEnabled {
return errors.New("PubSubServer not enabled but referenced by PubSubAgent component")
}
return nil
}
@@ -411,6 +419,16 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
return err
}
jsnPubSubServCfg, err := jsnCfg.PubSubServJsonCfg()
if err != nil {
return err
}
jsnPubSubAgentCfg, err := jsnCfg.PubSubAgentJsonCfg()
if err != nil {
return err
}
jsnMailerCfg, err := jsnCfg.MailerJsonCfg()
if err != nil {
return err
@@ -678,6 +696,26 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
}
}
if jsnPubSubAgentCfg != nil {
if jsnPubSubAgentCfg.Enabled != nil {
self.PubSubAgentEnabled = *jsnPubSubAgentCfg.Enabled
}
if jsnPubSubAgentCfg.Server != nil {
self.PubSubServer = *jsnPubSubAgentCfg.Server
}
}
if jsnPubSubServCfg != nil {
if jsnPubSubServCfg.Enabled != nil {
self.PubSubServerEnabled = *jsnPubSubServCfg.Enabled
}
if jsnPubSubServCfg.Save_interval != nil {
if self.PubSubSaveInterval, err = utils.ParseDurationWithSecs(*jsnPubSubServCfg.Save_interval); err != nil {
return err
}
}
}
if jsnMailerCfg != nil {
if jsnMailerCfg.Server != nil {
self.MailerServer = *jsnMailerCfg.Server

View File

@@ -243,6 +243,17 @@ const CGRATES_CFG_JSON = `
"server": "internal", // address where to reach the master history server: <internal|x.y.z.y:1234>
},
"pubsub_server": {
"enabled": false, // starts History service: <true|false>.
"save_interval": "1s", // interval to save changed cache into .git archive
},
"pubsub_agent": {
"enabled": false, // starts PubSub as a client: <true|false>.
"server": "internal", // address where to reach the master pubsub server: <internal|x.y.z.y:1234>
},
"mailer": {
"server": "localhost", // the server to use when sending emails out

View File

@@ -20,9 +20,10 @@ package config
import (
"encoding/json"
"github.com/DisposaBoy/JsonConfigReader"
"io"
"os"
"github.com/DisposaBoy/JsonConfigReader"
)
const (
@@ -48,6 +49,8 @@ const (
OSIPS_JSN = "opensips"
HISTSERV_JSN = "history_server"
HISTAGENT_JSN = "history_agent"
PUBSUBSERV_JSN = "pubsub_server"
PUBSUBAGENT_JSN = "pubsub_agent"
MAILER_JSN = "mailer"
)
@@ -254,6 +257,30 @@ func (self CgrJsonCfg) HistAgentJsonCfg() (*HistAgentJsonCfg, error) {
return cfg, nil
}
func (self CgrJsonCfg) PubSubServJsonCfg() (*PubSubServJsonCfg, error) {
rawCfg, hasKey := self[HISTSERV_JSN]
if !hasKey {
return nil, nil
}
cfg := new(PubSubServJsonCfg)
if err := json.Unmarshal(*rawCfg, cfg); err != nil {
return nil, err
}
return cfg, nil
}
func (self CgrJsonCfg) PubSubAgentJsonCfg() (*PubSubAgentJsonCfg, error) {
rawCfg, hasKey := self[HISTAGENT_JSN]
if !hasKey {
return nil, nil
}
cfg := new(PubSubAgentJsonCfg)
if err := json.Unmarshal(*rawCfg, cfg); err != nil {
return nil, err
}
return cfg, nil
}
func (self CgrJsonCfg) MailerJsonCfg() (*MailerJsonCfg, error) {
rawCfg, hasKey := self[MAILER_JSN]
if !hasKey {

View File

@@ -406,6 +406,30 @@ func TestDfHistAgentJsonCfg(t *testing.T) {
}
}
func TestDfPubSubServJsonCfg(t *testing.T) {
eCfg := &PubSubServJsonCfg{
Enabled: utils.BoolPointer(false),
Save_interval: utils.StringPointer("1s"),
}
if cfg, err := dfCgrJsonCfg.PubSubServJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, cfg) {
t.Error("Received: ", cfg)
}
}
func TestDfPubSubAgentJsonCfg(t *testing.T) {
eCfg := &PubSubAgentJsonCfg{
Enabled: utils.BoolPointer(false),
Server: utils.StringPointer("internal"),
}
if cfg, err := dfCgrJsonCfg.PubSubAgentJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, cfg) {
t.Error("Received: ", cfg)
}
}
func TestDfMailerJsonCfg(t *testing.T) {
eCfg := &MailerJsonCfg{
Server: utils.StringPointer("localhost"),
@@ -463,6 +487,11 @@ func TestNewCgrJsonCfgFromFile(t *testing.T) {
} else if cfg != nil {
t.Error("Received: ", cfg)
}
if cfg, err := cgrJsonCfg.PubSubAgentJsonCfg(); err != nil {
t.Error(err)
} else if cfg != nil {
t.Error("Received: ", cfg)
}
eCfgSmFs := &SmFsJsonCfg{
Enabled: utils.BoolPointer(true),
Connections: &[]*FsConnJsonCfg{

View File

@@ -217,6 +217,18 @@ type HistAgentJsonCfg struct {
Server *string
}
// PubSub server config section
type PubSubServJsonCfg struct {
Enabled *bool
Save_interval *string
}
// PubSub agent config section
type PubSubAgentJsonCfg struct {
Enabled *bool
Server *string
}
// Mailer config section
type MailerJsonCfg struct {
Server *string

View File

@@ -222,6 +222,16 @@
// "server": "internal", // address where to reach the master history server: <internal|x.y.z.y:1234>
//},
//"pubsub_server": {
// "enabled": false, // starts pubsub service: <true|false>.
// "save_interval": "1s", // interval to save subscribers
//},
//"pubsub_agent": {
// "enabled": false, // starts pubsub as a client: <true|false>.
// "server": "internal", // address where to reach the master pubsub server: <internal|x.y.z.y:1234>
//},
//"mailer": {
// "server": "localhost", // the server to use when sending emails out

View File

@@ -21,15 +21,15 @@ package engine
import (
"errors"
"fmt"
//"log"
"log/syslog"
"sort"
"strings"
"time"
//"encoding/json"
"github.com/cgrates/cgrates/cache2go"
"github.com/cgrates/cgrates/history"
"github.com/cgrates/cgrates/pubsub"
"github.com/cgrates/cgrates/utils"
)
@@ -68,6 +68,7 @@ var (
debitPeriod = 10 * time.Second
globalRoundingDecimals = 10
historyScribe history.Scribe
pubSubServer pubsub.PublisherSubscriber
//historyScribe, _ = history.NewMockScribe()
)
@@ -104,6 +105,10 @@ func SetHistoryScribe(scribe history.Scribe) {
historyScribe = scribe
}
func SetPubSub(ps pubsub.PublisherSubscriber) {
pubSubServer = ps
}
/*
The input stucture that contains call information.
*/

View File

@@ -2,12 +2,9 @@ package pubsub
import (
"errors"
"fmt"
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
@@ -32,14 +29,14 @@ type PublisherSubscriber interface {
type PubSub struct {
subscribers map[string]map[string]time.Time
conf *config.CGRConfig
ttlVerify bool
pubFunc func(string, bool, interface{}) ([]byte, error)
mux *sync.Mutex
}
func NewPubSub(conf *config.CGRConfig) *PubSub {
func NewPubSub(ttlVerify bool) *PubSub {
return &PubSub{
conf: conf,
ttlVerify: ttlVerify,
subscribers: make(map[string]map[string]time.Time),
pubFunc: utils.HttpJsonPost,
mux: &sync.Mutex{},
@@ -84,7 +81,6 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error {
for transport_address, expTime := range subs {
split := utils.InfieldSplit(transport_address)
if len(split) != 2 {
engine.Logger.Warning("<PubSub> Wrong transport;address pair: " + transport_address)
continue
}
transport := split[0]
@@ -98,11 +94,8 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error {
go func() {
delay := utils.Fib()
for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort
if _, err := ps.pubFunc(address, ps.conf.HttpSkipTlsVerify, pi.Event); err == nil {
if _, err := ps.pubFunc(address, ps.ttlVerify, pi.Event); err == nil {
break // Success, no need to reinterate
} else if i == 4 { // Last iteration, syslog the warning
engine.Logger.Warning(fmt.Sprintf("<PubSub> Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), pi.Event["EventName"]))
break
}
time.Sleep(delay())
}

View File

@@ -4,12 +4,11 @@ import (
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
func TestSubscribe(t *testing.T) {
ps := NewPubSub(nil)
ps := NewPubSub(false)
var r string
if err := ps.Subscribe(SubscribeInfo{
EventName: "test",
@@ -25,7 +24,7 @@ func TestSubscribe(t *testing.T) {
}
func TestSubscribeNoTransport(t *testing.T) {
ps := NewPubSub(nil)
ps := NewPubSub(false)
var r string
if err := ps.Subscribe(SubscribeInfo{
EventName: "test",
@@ -38,7 +37,7 @@ func TestSubscribeNoTransport(t *testing.T) {
}
func TestSubscribeNoExpire(t *testing.T) {
ps := NewPubSub(nil)
ps := NewPubSub(false)
var r string
if err := ps.Subscribe(SubscribeInfo{
EventName: "test",
@@ -54,7 +53,7 @@ func TestSubscribeNoExpire(t *testing.T) {
}
func TestUnsubscribe(t *testing.T) {
ps := NewPubSub(nil)
ps := NewPubSub(false)
var r string
if err := ps.Subscribe(SubscribeInfo{
EventName: "test",
@@ -77,7 +76,7 @@ func TestUnsubscribe(t *testing.T) {
}
func TestPublish(t *testing.T) {
ps := NewPubSub(&config.CGRConfig{HttpSkipTlsVerify: true})
ps := NewPubSub(true)
ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) {
obj.(map[string]string)["called"] = url
return nil, nil
@@ -111,7 +110,7 @@ func TestPublish(t *testing.T) {
}
func TestPublishExpired(t *testing.T) {
ps := NewPubSub(&config.CGRConfig{HttpSkipTlsVerify: true})
ps := NewPubSub(true)
ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) {
m := obj.(map[string]string)
m["called"] = "yes"