diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index d18d538b1..e5b9fae17 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -35,6 +35,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/history" + "github.com/cgrates/cgrates/pubsub" "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/sessionmanager" "github.com/cgrates/cgrates/utils" @@ -67,6 +68,7 @@ var ( exitChan = make(chan bool) server = &engine.Server{} scribeServer history.Scribe + pubSubServer pubsub.PublisherSubscriber cdrServer *engine.CdrServer cdrStats *engine.Stats cfg *config.CGRConfig @@ -363,6 +365,46 @@ func startHistoryAgent(chanServerStarted chan struct{}) { return } +func startPubSubServer(chanDone chan struct{}) { + if pubSubServer = pubsub.NewPubSub(cfg.HttpSkipTlsVerify); err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) + exitChan <- true + return + } + server.RpcRegisterName("PubSub", pubSubServer) + close(chanDone) +} + +// chanStartServer will report when server is up, useful for internal requests +func startPubSubAgent(chanServerStarted chan struct{}) { + if cfg.PubSubServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting + engine.Logger.Crit(fmt.Sprintf(" Connecting internally to PubSubServer")) + select { + case <-time.After(1 * time.Minute): + engine.Logger.Crit(fmt.Sprintf(" Timeout waiting for server to start.")) + exitChan <- true + return + case <-chanServerStarted: + } + //<-chanServerStarted // If server is not enabled, will have deadlock here + } else { // Connect in iteration since there are chances of concurrency here + delay := utils.Fib() + for i := 0; i < 3; i++ { //ToDo: Make it globally configurable + //engine.Logger.Crit(fmt.Sprintf(" Trying to connect, iteration: %d, time %s", i, time.Now())) + if pubSubServer = pubsub.NewPubSub(cfg.HttpSkipTlsVerify); err == nil { + break //Connected so no need to reiterate + } else if i == 2 && err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not connect to the server, error: %s", err.Error())) + exitChan <- true + return + } + time.Sleep(delay()) + } + } + engine.SetPubSub(pubSubServer) // scribeServer comes from global variable + return +} + // Starts the rpc server, waiting for the necessary components to finish their tasks func serveRpc(rpcWaitChans []chan struct{}) { for _, chn := range rpcWaitChans { @@ -579,6 +621,18 @@ func main() { go startHistoryAgent(histServChan) } + var pubsubServChan chan struct{} // Will be initialized only if the server starts + if cfg.PubSubServerEnabled { + pubsubServChan = make(chan struct{}) + rpcWait = append(rpcWait, pubsubServChan) + go startPubSubServer(pubsubServChan) + } + + if cfg.PubSubAgentEnabled { + engine.Logger.Info("Starting CGRateS PubSub Agent.") + go startPubSubAgent(pubsubServChan) + } + var cdrsChan chan struct{} if cfg.CDRSEnabled { engine.Logger.Info("Starting CGRateS CDRS service.") diff --git a/config/config.go b/config/config.go index 989b0f655..b7226fa61 100644 --- a/config/config.go +++ b/config/config.go @@ -213,7 +213,11 @@ type CGRConfig struct { HistoryServer string // Address where to reach the master history server: HistoryServerEnabled bool // Starts History as server: . HistoryDir string // Location on disk where to store history files. - HistorySaveInterval time.Duration // The timout duration between history writes + HistorySaveInterval time.Duration // The timout duration between pubsub writes + PubSubAgentEnabled bool // Starts PubSub as an agent: . + PubSubServer string // Address where to reach the master pubsub server: + PubSubServerEnabled bool // Starts PubSub as server: . + PubSubSaveInterval time.Duration // The timout duration between pubsub writes MailerServer string // The server to use when sending emails out MailerAuthUser string // Authenticate to email server using this user MailerAuthPass string // Authenticate to email server with this password @@ -319,6 +323,10 @@ func (self *CGRConfig) checkConfigSanity() error { if self.HistoryAgentEnabled && !self.HistoryServerEnabled { return errors.New("HistoryServer not enabled but referenced by HistoryAgent component") } + // PubSubAgent + if self.PubSubAgentEnabled && !self.PubSubServerEnabled { + return errors.New("PubSubServer not enabled but referenced by PubSubAgent component") + } return nil } @@ -411,6 +419,16 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { return err } + jsnPubSubServCfg, err := jsnCfg.PubSubServJsonCfg() + if err != nil { + return err + } + + jsnPubSubAgentCfg, err := jsnCfg.PubSubAgentJsonCfg() + if err != nil { + return err + } + jsnMailerCfg, err := jsnCfg.MailerJsonCfg() if err != nil { return err @@ -678,6 +696,26 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { } } + if jsnPubSubAgentCfg != nil { + if jsnPubSubAgentCfg.Enabled != nil { + self.PubSubAgentEnabled = *jsnPubSubAgentCfg.Enabled + } + if jsnPubSubAgentCfg.Server != nil { + self.PubSubServer = *jsnPubSubAgentCfg.Server + } + } + + if jsnPubSubServCfg != nil { + if jsnPubSubServCfg.Enabled != nil { + self.PubSubServerEnabled = *jsnPubSubServCfg.Enabled + } + if jsnPubSubServCfg.Save_interval != nil { + if self.PubSubSaveInterval, err = utils.ParseDurationWithSecs(*jsnPubSubServCfg.Save_interval); err != nil { + return err + } + } + } + if jsnMailerCfg != nil { if jsnMailerCfg.Server != nil { self.MailerServer = *jsnMailerCfg.Server diff --git a/config/config_defaults.go b/config/config_defaults.go index 5d59cf5d8..786f57250 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -243,6 +243,17 @@ const CGRATES_CFG_JSON = ` "server": "internal", // address where to reach the master history server: }, +"pubsub_server": { + "enabled": false, // starts History service: . + "save_interval": "1s", // interval to save changed cache into .git archive +}, + + +"pubsub_agent": { + "enabled": false, // starts PubSub as a client: . + "server": "internal", // address where to reach the master pubsub server: +}, + "mailer": { "server": "localhost", // the server to use when sending emails out diff --git a/config/config_json.go b/config/config_json.go index 6ccb5caea..7ef0ca3bd 100644 --- a/config/config_json.go +++ b/config/config_json.go @@ -20,9 +20,10 @@ package config import ( "encoding/json" - "github.com/DisposaBoy/JsonConfigReader" "io" "os" + + "github.com/DisposaBoy/JsonConfigReader" ) const ( @@ -48,6 +49,8 @@ const ( OSIPS_JSN = "opensips" HISTSERV_JSN = "history_server" HISTAGENT_JSN = "history_agent" + PUBSUBSERV_JSN = "pubsub_server" + PUBSUBAGENT_JSN = "pubsub_agent" MAILER_JSN = "mailer" ) @@ -254,6 +257,30 @@ func (self CgrJsonCfg) HistAgentJsonCfg() (*HistAgentJsonCfg, error) { return cfg, nil } +func (self CgrJsonCfg) PubSubServJsonCfg() (*PubSubServJsonCfg, error) { + rawCfg, hasKey := self[HISTSERV_JSN] + if !hasKey { + return nil, nil + } + cfg := new(PubSubServJsonCfg) + if err := json.Unmarshal(*rawCfg, cfg); err != nil { + return nil, err + } + return cfg, nil +} + +func (self CgrJsonCfg) PubSubAgentJsonCfg() (*PubSubAgentJsonCfg, error) { + rawCfg, hasKey := self[HISTAGENT_JSN] + if !hasKey { + return nil, nil + } + cfg := new(PubSubAgentJsonCfg) + if err := json.Unmarshal(*rawCfg, cfg); err != nil { + return nil, err + } + return cfg, nil +} + func (self CgrJsonCfg) MailerJsonCfg() (*MailerJsonCfg, error) { rawCfg, hasKey := self[MAILER_JSN] if !hasKey { diff --git a/config/config_json_test.go b/config/config_json_test.go index 8faa57124..cde9a43d5 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -406,6 +406,30 @@ func TestDfHistAgentJsonCfg(t *testing.T) { } } +func TestDfPubSubServJsonCfg(t *testing.T) { + eCfg := &PubSubServJsonCfg{ + Enabled: utils.BoolPointer(false), + Save_interval: utils.StringPointer("1s"), + } + if cfg, err := dfCgrJsonCfg.PubSubServJsonCfg(); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eCfg, cfg) { + t.Error("Received: ", cfg) + } +} + +func TestDfPubSubAgentJsonCfg(t *testing.T) { + eCfg := &PubSubAgentJsonCfg{ + Enabled: utils.BoolPointer(false), + Server: utils.StringPointer("internal"), + } + if cfg, err := dfCgrJsonCfg.PubSubAgentJsonCfg(); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eCfg, cfg) { + t.Error("Received: ", cfg) + } +} + func TestDfMailerJsonCfg(t *testing.T) { eCfg := &MailerJsonCfg{ Server: utils.StringPointer("localhost"), @@ -463,6 +487,11 @@ func TestNewCgrJsonCfgFromFile(t *testing.T) { } else if cfg != nil { t.Error("Received: ", cfg) } + if cfg, err := cgrJsonCfg.PubSubAgentJsonCfg(); err != nil { + t.Error(err) + } else if cfg != nil { + t.Error("Received: ", cfg) + } eCfgSmFs := &SmFsJsonCfg{ Enabled: utils.BoolPointer(true), Connections: &[]*FsConnJsonCfg{ diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 4df2fe490..cb1a37c80 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -217,6 +217,18 @@ type HistAgentJsonCfg struct { Server *string } +// PubSub server config section +type PubSubServJsonCfg struct { + Enabled *bool + Save_interval *string +} + +// PubSub agent config section +type PubSubAgentJsonCfg struct { + Enabled *bool + Server *string +} + // Mailer config section type MailerJsonCfg struct { Server *string diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 9fab1711f..905f73855 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -222,6 +222,16 @@ // "server": "internal", // address where to reach the master history server: //}, +//"pubsub_server": { +// "enabled": false, // starts pubsub service: . +// "save_interval": "1s", // interval to save subscribers +//}, + + +//"pubsub_agent": { +// "enabled": false, // starts pubsub as a client: . +// "server": "internal", // address where to reach the master pubsub server: +//}, //"mailer": { // "server": "localhost", // the server to use when sending emails out diff --git a/engine/calldesc.go b/engine/calldesc.go index 01ba1da85..65ecfdd58 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -21,15 +21,15 @@ package engine import ( "errors" "fmt" - //"log" "log/syslog" "sort" "strings" "time" - //"encoding/json" + "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/history" + "github.com/cgrates/cgrates/pubsub" "github.com/cgrates/cgrates/utils" ) @@ -68,6 +68,7 @@ var ( debitPeriod = 10 * time.Second globalRoundingDecimals = 10 historyScribe history.Scribe + pubSubServer pubsub.PublisherSubscriber //historyScribe, _ = history.NewMockScribe() ) @@ -104,6 +105,10 @@ func SetHistoryScribe(scribe history.Scribe) { historyScribe = scribe } +func SetPubSub(ps pubsub.PublisherSubscriber) { + pubSubServer = ps +} + /* The input stucture that contains call information. */ diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index ce4b05886..28f890517 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -2,12 +2,9 @@ package pubsub import ( "errors" - "fmt" "sync" "time" - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) @@ -32,14 +29,14 @@ type PublisherSubscriber interface { type PubSub struct { subscribers map[string]map[string]time.Time - conf *config.CGRConfig + ttlVerify bool pubFunc func(string, bool, interface{}) ([]byte, error) mux *sync.Mutex } -func NewPubSub(conf *config.CGRConfig) *PubSub { +func NewPubSub(ttlVerify bool) *PubSub { return &PubSub{ - conf: conf, + ttlVerify: ttlVerify, subscribers: make(map[string]map[string]time.Time), pubFunc: utils.HttpJsonPost, mux: &sync.Mutex{}, @@ -84,7 +81,6 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { for transport_address, expTime := range subs { split := utils.InfieldSplit(transport_address) if len(split) != 2 { - engine.Logger.Warning(" Wrong transport;address pair: " + transport_address) continue } transport := split[0] @@ -98,11 +94,8 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { go func() { delay := utils.Fib() for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort - if _, err := ps.pubFunc(address, ps.conf.HttpSkipTlsVerify, pi.Event); err == nil { + if _, err := ps.pubFunc(address, ps.ttlVerify, pi.Event); err == nil { break // Success, no need to reinterate - } else if i == 4 { // Last iteration, syslog the warning - engine.Logger.Warning(fmt.Sprintf(" Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), pi.Event["EventName"])) - break } time.Sleep(delay()) } diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 74c964dba..71a94fe64 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -4,12 +4,11 @@ import ( "testing" "time" - "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) func TestSubscribe(t *testing.T) { - ps := NewPubSub(nil) + ps := NewPubSub(false) var r string if err := ps.Subscribe(SubscribeInfo{ EventName: "test", @@ -25,7 +24,7 @@ func TestSubscribe(t *testing.T) { } func TestSubscribeNoTransport(t *testing.T) { - ps := NewPubSub(nil) + ps := NewPubSub(false) var r string if err := ps.Subscribe(SubscribeInfo{ EventName: "test", @@ -38,7 +37,7 @@ func TestSubscribeNoTransport(t *testing.T) { } func TestSubscribeNoExpire(t *testing.T) { - ps := NewPubSub(nil) + ps := NewPubSub(false) var r string if err := ps.Subscribe(SubscribeInfo{ EventName: "test", @@ -54,7 +53,7 @@ func TestSubscribeNoExpire(t *testing.T) { } func TestUnsubscribe(t *testing.T) { - ps := NewPubSub(nil) + ps := NewPubSub(false) var r string if err := ps.Subscribe(SubscribeInfo{ EventName: "test", @@ -77,7 +76,7 @@ func TestUnsubscribe(t *testing.T) { } func TestPublish(t *testing.T) { - ps := NewPubSub(&config.CGRConfig{HttpSkipTlsVerify: true}) + ps := NewPubSub(true) ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { obj.(map[string]string)["called"] = url return nil, nil @@ -111,7 +110,7 @@ func TestPublish(t *testing.T) { } func TestPublishExpired(t *testing.T) { - ps := NewPubSub(&config.CGRConfig{HttpSkipTlsVerify: true}) + ps := NewPubSub(true) ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { m := obj.(map[string]string) m["called"] = "yes"