diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index d18d538b1..693fdf5b5 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -67,6 +67,7 @@ var ( exitChan = make(chan bool) server = &engine.Server{} scribeServer history.Scribe + pubSubServer engine.PublisherSubscriber cdrServer *engine.CdrServer cdrStats *engine.Stats cfg *config.CGRConfig @@ -336,7 +337,7 @@ func startHistoryServer(chanDone chan struct{}) { // chanStartServer will report when server is up, useful for internal requests func startHistoryAgent(chanServerStarted chan struct{}) { if cfg.HistoryServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting - engine.Logger.Crit(fmt.Sprintf(" Connecting internally to HistoryServer")) + //engine.Logger.Crit(fmt.Sprintf(" Connecting internally to HistoryServer")) select { case <-time.After(1 * time.Minute): engine.Logger.Crit(fmt.Sprintf(" Timeout waiting for server to start.")) @@ -363,6 +364,45 @@ func startHistoryAgent(chanServerStarted chan struct{}) { return } +func startPubSubServer(chanDone chan struct{}, accountDb engine.AccountingStorage) { + if pubSubServer = engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify); err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) + exitChan <- true + return + } + server.RpcRegisterName("PubSubV1", pubSubServer) + close(chanDone) +} + +// chanStartServer will report when server is up, useful for internal requests +func startPubSubAgent(chanServerStarted chan struct{}, accountDb engine.AccountingStorage) { + if cfg.PubSubServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting + select { + case <-time.After(1 * time.Minute): + engine.Logger.Crit(fmt.Sprintf(" Timeout waiting for server to start.")) + exitChan <- true + return + case <-chanServerStarted: + } + //<-chanServerStarted // If server is not enabled, will have deadlock here + } else { // Connect in iteration since there are chances of concurrency here + delay := utils.Fib() + for i := 0; i < 3; i++ { //ToDo: Make it globally configurable + //engine.Logger.Crit(fmt.Sprintf(" Trying to connect, iteration: %d, time %s", i, time.Now())) + if pubSubServer = engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify); err == nil { + break //Connected so no need to reiterate + } else if i == 2 && err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not connect to the server, error: %s", err.Error())) + exitChan <- true + return + } + time.Sleep(delay()) + } + } + engine.SetPubSub(pubSubServer) // scribeServer comes from global variable + return +} + // Starts the rpc server, waiting for the necessary components to finish their tasks func serveRpc(rpcWaitChans []chan struct{}) { for _, chn := range rpcWaitChans { @@ -579,6 +619,18 @@ func main() { go startHistoryAgent(histServChan) } + var pubsubServChan chan struct{} // Will be initialized only if the server starts + if cfg.PubSubServerEnabled { + pubsubServChan = make(chan struct{}) + rpcWait = append(rpcWait, pubsubServChan) + go startPubSubServer(pubsubServChan, accountDb) + } + + if cfg.PubSubAgentEnabled { + engine.Logger.Info("Starting CGRateS PubSub Agent.") + go startPubSubAgent(pubsubServChan, accountDb) + } + var cdrsChan chan struct{} if cfg.CDRSEnabled { engine.Logger.Info("Starting CGRateS CDRS service.") diff --git a/cmd/cgr-engine/registration.go b/cmd/cgr-engine/registration.go index a929eaada..97837b68b 100644 --- a/cmd/cgr-engine/registration.go +++ b/cmd/cgr-engine/registration.go @@ -49,7 +49,9 @@ func generalSignalHandler() { sig := <-c engine.Logger.Info(fmt.Sprintf("Caught signal %v, shuting down cgr-engine\n", sig)) var dummyInt int - cdrStats.Stop(dummyInt, &dummyInt) + if cdrStats != nil { + cdrStats.Stop(dummyInt, &dummyInt) + } exitChan <- true } diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go index 174d50c01..1c3999a3d 100644 --- a/cmd/cgr-tester/cgr-tester.go +++ b/cmd/cgr-tester/cgr-tester.go @@ -23,6 +23,7 @@ import ( "fmt" "log" "net/rpc" + "net/rpc/jsonrpc" "os" "runtime" "runtime/pprof" @@ -58,6 +59,7 @@ var ( tenant = flag.String("tenant", "cgrates.org", "The type of record to use in queries.") subject = flag.String("subject", "1001", "The rating subject to use in queries.") destination = flag.String("destination", "1002", "The destination to use in queries.") + json = flag.Bool("json", false, "Use JSON RPC") nilDuration = time.Duration(0) ) @@ -107,7 +109,14 @@ func durInternalRater(cd *engine.CallDescriptor) (time.Duration, error) { func durRemoteRater(cd *engine.CallDescriptor) (time.Duration, error) { result := engine.CallCost{} - client, err := rpc.Dial("tcp", *raterAddress) + var client *rpc.Client + var err error + if *json { + client, err = jsonrpc.Dial("tcp", *raterAddress) + } else { + client, err = rpc.Dial("tcp", *raterAddress) + } + if err != nil { return nilDuration, fmt.Errorf("Could not connect to engine: %s", err.Error()) } diff --git a/config/config.go b/config/config.go index 989b0f655..e78aad64a 100644 --- a/config/config.go +++ b/config/config.go @@ -213,7 +213,10 @@ type CGRConfig struct { HistoryServer string // Address where to reach the master history server: HistoryServerEnabled bool // Starts History as server: . HistoryDir string // Location on disk where to store history files. - HistorySaveInterval time.Duration // The timout duration between history writes + HistorySaveInterval time.Duration // The timout duration between pubsub writes + PubSubAgentEnabled bool // Starts PubSub as an agent: . + PubSubServer string // Address where to reach the master pubsub server: + PubSubServerEnabled bool // Starts PubSub as server: . MailerServer string // The server to use when sending emails out MailerAuthUser string // Authenticate to email server using this user MailerAuthPass string // Authenticate to email server with this password @@ -319,6 +322,10 @@ func (self *CGRConfig) checkConfigSanity() error { if self.HistoryAgentEnabled && !self.HistoryServerEnabled { return errors.New("HistoryServer not enabled but referenced by HistoryAgent component") } + // PubSubAgent + if self.PubSubAgentEnabled && !self.PubSubServerEnabled { + return errors.New("PubSubServer not enabled but referenced by PubSubAgent component") + } return nil } @@ -411,6 +418,16 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { return err } + jsnPubSubServCfg, err := jsnCfg.PubSubServJsonCfg() + if err != nil { + return err + } + + jsnPubSubAgentCfg, err := jsnCfg.PubSubAgentJsonCfg() + if err != nil { + return err + } + jsnMailerCfg, err := jsnCfg.MailerJsonCfg() if err != nil { return err @@ -678,6 +695,21 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { } } + if jsnPubSubAgentCfg != nil { + if jsnPubSubAgentCfg.Enabled != nil { + self.PubSubAgentEnabled = *jsnPubSubAgentCfg.Enabled + } + if jsnPubSubAgentCfg.Server != nil { + self.PubSubServer = *jsnPubSubAgentCfg.Server + } + } + + if jsnPubSubServCfg != nil { + if jsnPubSubServCfg.Enabled != nil { + self.PubSubServerEnabled = *jsnPubSubServCfg.Enabled + } + } + if jsnMailerCfg != nil { if jsnMailerCfg.Server != nil { self.MailerServer = *jsnMailerCfg.Server diff --git a/config/config_defaults.go b/config/config_defaults.go index 5d59cf5d8..786f57250 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -243,6 +243,17 @@ const CGRATES_CFG_JSON = ` "server": "internal", // address where to reach the master history server: }, +"pubsub_server": { + "enabled": false, // starts History service: . + "save_interval": "1s", // interval to save changed cache into .git archive +}, + + +"pubsub_agent": { + "enabled": false, // starts PubSub as a client: . + "server": "internal", // address where to reach the master pubsub server: +}, + "mailer": { "server": "localhost", // the server to use when sending emails out diff --git a/config/config_json.go b/config/config_json.go index 6ccb5caea..213ff65c2 100644 --- a/config/config_json.go +++ b/config/config_json.go @@ -20,9 +20,10 @@ package config import ( "encoding/json" - "github.com/DisposaBoy/JsonConfigReader" "io" "os" + + "github.com/DisposaBoy/JsonConfigReader" ) const ( @@ -48,6 +49,8 @@ const ( OSIPS_JSN = "opensips" HISTSERV_JSN = "history_server" HISTAGENT_JSN = "history_agent" + PUBSUBSERV_JSN = "pubsub_server" + PUBSUBAGENT_JSN = "pubsub_agent" MAILER_JSN = "mailer" ) @@ -254,6 +257,30 @@ func (self CgrJsonCfg) HistAgentJsonCfg() (*HistAgentJsonCfg, error) { return cfg, nil } +func (self CgrJsonCfg) PubSubServJsonCfg() (*PubSubServJsonCfg, error) { + rawCfg, hasKey := self[PUBSUBSERV_JSN] + if !hasKey { + return nil, nil + } + cfg := new(PubSubServJsonCfg) + if err := json.Unmarshal(*rawCfg, cfg); err != nil { + return nil, err + } + return cfg, nil +} + +func (self CgrJsonCfg) PubSubAgentJsonCfg() (*PubSubAgentJsonCfg, error) { + rawCfg, hasKey := self[PUBSUBAGENT_JSN] + if !hasKey { + return nil, nil + } + cfg := new(PubSubAgentJsonCfg) + if err := json.Unmarshal(*rawCfg, cfg); err != nil { + return nil, err + } + return cfg, nil +} + func (self CgrJsonCfg) MailerJsonCfg() (*MailerJsonCfg, error) { rawCfg, hasKey := self[MAILER_JSN] if !hasKey { diff --git a/config/config_json_test.go b/config/config_json_test.go index 8faa57124..960fe8862 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -406,6 +406,29 @@ func TestDfHistAgentJsonCfg(t *testing.T) { } } +func TestDfPubSubServJsonCfg(t *testing.T) { + eCfg := &PubSubServJsonCfg{ + Enabled: utils.BoolPointer(false), + } + if cfg, err := dfCgrJsonCfg.PubSubServJsonCfg(); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eCfg, cfg) { + t.Error("Received: ", cfg) + } +} + +func TestDfPubSubAgentJsonCfg(t *testing.T) { + eCfg := &PubSubAgentJsonCfg{ + Enabled: utils.BoolPointer(false), + Server: utils.StringPointer("internal"), + } + if cfg, err := dfCgrJsonCfg.PubSubAgentJsonCfg(); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eCfg, cfg) { + t.Error("Received: ", cfg) + } +} + func TestDfMailerJsonCfg(t *testing.T) { eCfg := &MailerJsonCfg{ Server: utils.StringPointer("localhost"), @@ -463,6 +486,11 @@ func TestNewCgrJsonCfgFromFile(t *testing.T) { } else if cfg != nil { t.Error("Received: ", cfg) } + if cfg, err := cgrJsonCfg.PubSubAgentJsonCfg(); err != nil { + t.Error(err) + } else if cfg != nil { + t.Error("Received: ", cfg) + } eCfgSmFs := &SmFsJsonCfg{ Enabled: utils.BoolPointer(true), Connections: &[]*FsConnJsonCfg{ diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 4df2fe490..f01c7dd77 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -217,6 +217,17 @@ type HistAgentJsonCfg struct { Server *string } +// PubSub server config section +type PubSubServJsonCfg struct { + Enabled *bool +} + +// PubSub agent config section +type PubSubAgentJsonCfg struct { + Enabled *bool + Server *string +} + // Mailer config section type MailerJsonCfg struct { Server *string diff --git a/console/publish.go b/console/publish.go new file mode 100644 index 000000000..a407c8023 --- /dev/null +++ b/console/publish.go @@ -0,0 +1,64 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2012-2015 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package console + +import "github.com/cgrates/cgrates/engine" + +func init() { + c := &CmdPublish{ + name: "publish", + rpcMethod: "PubSubV1.Publish", + } + commands[c.Name()] = c + c.CommandExecuter = &CommandExecuter{c} +} + +type CmdPublish struct { + name string + rpcMethod string + rpcParams *engine.PublishInfo + *CommandExecuter +} + +func (self *CmdPublish) Name() string { + return self.name +} + +func (self *CmdPublish) RpcMethod() string { + return self.rpcMethod +} + +func (self *CmdPublish) RpcParams(ptr bool) interface{} { + if self.rpcParams == nil { + self.rpcParams = &engine.PublishInfo{} + } + if ptr { + return self.rpcParams + } + return *self.rpcParams +} + +func (self *CmdPublish) PostprocessRpcParams() error { + return nil +} + +func (self *CmdPublish) RpcResult() interface{} { + var s string + return &s +} diff --git a/console/show_subscribers.go b/console/show_subscribers.go new file mode 100644 index 000000000..7ddae5af0 --- /dev/null +++ b/console/show_subscribers.go @@ -0,0 +1,64 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2012-2015 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package console + +import "github.com/cgrates/cgrates/engine" + +func init() { + c := &CmdShowSubscribers{ + name: "show_subscribers", + rpcMethod: "PubSubV1.ShowSubscribers", + } + commands[c.Name()] = c + c.CommandExecuter = &CommandExecuter{c} +} + +type CmdShowSubscribers struct { + name string + rpcMethod string + rpcParams *StringWrapper + *CommandExecuter +} + +func (self *CmdShowSubscribers) Name() string { + return self.name +} + +func (self *CmdShowSubscribers) RpcMethod() string { + return self.rpcMethod +} + +func (self *CmdShowSubscribers) RpcParams(ptr bool) interface{} { + if self.rpcParams == nil { + self.rpcParams = &StringWrapper{} + } + if ptr { + return self.rpcParams + } + return *self.rpcParams +} + +func (self *CmdShowSubscribers) PostprocessRpcParams() error { + return nil +} + +func (self *CmdShowSubscribers) RpcResult() interface{} { + var s map[string]map[string]*engine.SubscriberData + return &s +} diff --git a/console/subscribe.go b/console/subscribe.go new file mode 100644 index 000000000..ee8c27bd7 --- /dev/null +++ b/console/subscribe.go @@ -0,0 +1,64 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2012-2015 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package console + +import "github.com/cgrates/cgrates/engine" + +func init() { + c := &CmdSubscribe{ + name: "subscribe", + rpcMethod: "PubSubV1.Subscribe", + } + commands[c.Name()] = c + c.CommandExecuter = &CommandExecuter{c} +} + +type CmdSubscribe struct { + name string + rpcMethod string + rpcParams *engine.SubscribeInfo + *CommandExecuter +} + +func (self *CmdSubscribe) Name() string { + return self.name +} + +func (self *CmdSubscribe) RpcMethod() string { + return self.rpcMethod +} + +func (self *CmdSubscribe) RpcParams(ptr bool) interface{} { + if self.rpcParams == nil { + self.rpcParams = &engine.SubscribeInfo{} + } + if ptr { + return self.rpcParams + } + return *self.rpcParams +} + +func (self *CmdSubscribe) PostprocessRpcParams() error { + return nil +} + +func (self *CmdSubscribe) RpcResult() interface{} { + var s string + return &s +} diff --git a/console/unsubscribe.go b/console/unsubscribe.go new file mode 100644 index 000000000..66f02d741 --- /dev/null +++ b/console/unsubscribe.go @@ -0,0 +1,64 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2012-2015 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package console + +import "github.com/cgrates/cgrates/engine" + +func init() { + c := &CmdUnsubscribe{ + name: "unsubscribe", + rpcMethod: "PubSubV1.Unsubscribe", + } + commands[c.Name()] = c + c.CommandExecuter = &CommandExecuter{c} +} + +type CmdUnsubscribe struct { + name string + rpcMethod string + rpcParams *engine.SubscribeInfo + *CommandExecuter +} + +func (self *CmdUnsubscribe) Name() string { + return self.name +} + +func (self *CmdUnsubscribe) RpcMethod() string { + return self.rpcMethod +} + +func (self *CmdUnsubscribe) RpcParams(ptr bool) interface{} { + if self.rpcParams == nil { + self.rpcParams = &engine.SubscribeInfo{} + } + if ptr { + return self.rpcParams + } + return *self.rpcParams +} + +func (self *CmdUnsubscribe) PostprocessRpcParams() error { + return nil +} + +func (self *CmdUnsubscribe) RpcResult() interface{} { + var s string + return &s +} diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 9fab1711f..6b1a4b031 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -222,6 +222,15 @@ // "server": "internal", // address where to reach the master history server: //}, +//"pubsub_server": { +// "enabled": false, // starts pubsub service: . +//}, + + +//"pubsub_agent": { +// "enabled": false, // starts pubsub as a client: . +// "server": "internal", // address where to reach the master pubsub server: +//}, //"mailer": { // "server": "localhost", // the server to use when sending emails out diff --git a/data/conf/samples/cgradmin/cgradmin.json b/data/conf/samples/cgradmin/cgradmin.json index 46f6fae5d..158e85047 100644 --- a/data/conf/samples/cgradmin/cgradmin.json +++ b/data/conf/samples/cgradmin/cgradmin.json @@ -17,4 +17,14 @@ "scheduler": { "enabled": true, // start Scheduler service: }, + +"pubsub_server": { + "enabled": true, // starts pubsub service: . +}, + + +"pubsub_agent": { + "enabled": true, // starts pubsub as a client: . + "server": "internal", // address where to reach the master pubsub server: +}, } diff --git a/engine/calldesc.go b/engine/calldesc.go index 01ba1da85..2e45d3c54 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -21,13 +21,12 @@ package engine import ( "errors" "fmt" - //"log" "log/syslog" "sort" "strings" "time" - //"encoding/json" + "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/history" "github.com/cgrates/cgrates/utils" @@ -68,6 +67,7 @@ var ( debitPeriod = 10 * time.Second globalRoundingDecimals = 10 historyScribe history.Scribe + pubSubServer PublisherSubscriber //historyScribe, _ = history.NewMockScribe() ) @@ -104,6 +104,10 @@ func SetHistoryScribe(scribe history.Scribe) { historyScribe = scribe } +func SetPubSub(ps PublisherSubscriber) { + pubSubServer = ps +} + /* The input stucture that contains call information. */ diff --git a/engine/pubsub.go b/engine/pubsub.go new file mode 100644 index 000000000..6784f423d --- /dev/null +++ b/engine/pubsub.go @@ -0,0 +1,184 @@ +package engine + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +type SubscribeInfo struct { + EventName string + EventFilter string + Transport string + Address string + LifeSpan time.Duration +} + +type PublishInfo struct { + Event map[string]string +} + +type PublisherSubscriber interface { + Subscribe(SubscribeInfo, *string) error + Unsubscribe(SubscribeInfo, *string) error + Publish(PublishInfo, *string) error + ShowSubscribers(string, *map[string]map[string]*SubscriberData) error +} + +type SubscriberData struct { + ExpTime time.Time + Filters utils.RSRFields +} + +type PubSub struct { + subscribers map[string]map[string]*SubscriberData + ttlVerify bool + pubFunc func(string, bool, interface{}) ([]byte, error) + mux *sync.Mutex + accountDb AccountingStorage +} + +func NewPubSub(accountDb AccountingStorage, ttlVerify bool) *PubSub { + ps := &PubSub{ + ttlVerify: ttlVerify, + subscribers: make(map[string]map[string]*SubscriberData), + pubFunc: utils.HttpJsonPost, + mux: &sync.Mutex{}, + accountDb: accountDb, + } + // load subscribers + if subs, err := accountDb.GetPubSubSubscribers(); err == nil { + ps.subscribers = subs + } + return ps +} + +func (ps *PubSub) saveSubscribers(key string) { + if key != "" { + if _, found := ps.subscribers[key]; !found { + return + } + if err := accountingStorage.SetPubSubSubscribers(key, ps.subscribers[key]); err != nil { + Logger.Err(" Error saving subscribers: " + err.Error()) + } + } else { // save all + for key, valueMap := range ps.subscribers { + if err := accountingStorage.SetPubSubSubscribers(key, valueMap); err != nil { + Logger.Err(" Error saving subscribers: " + err.Error()) + } + } + } +} + +func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error { + ps.mux.Lock() + defer ps.mux.Unlock() + if si.Transport != utils.META_HTTP_POST { + *reply = "Unsupported transport type" + return errors.New(*reply) + } + if ps.subscribers[si.EventName] == nil { + ps.subscribers[si.EventName] = make(map[string]*SubscriberData) + } + var expTime time.Time + if si.LifeSpan > 0 { + expTime = time.Now().Add(si.LifeSpan) + } + rsr, err := utils.ParseRSRFields(si.EventFilter, utils.INFIELD_SEP) + if err != nil { + *reply = err.Error() + return err + } + ps.subscribers[si.EventName][utils.InfieldJoin(si.Transport, si.Address)] = &SubscriberData{ + ExpTime: expTime, + Filters: rsr, + } + ps.saveSubscribers(si.EventName) + *reply = utils.OK + return nil +} + +func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error { + ps.mux.Lock() + defer ps.mux.Unlock() + if si.Transport != utils.META_HTTP_POST { + *reply = "Unsupported transport type" + return errors.New(*reply) + } + delete(ps.subscribers[si.EventName], utils.InfieldJoin(si.Transport, si.Address)) + ps.saveSubscribers(si.EventName) + *reply = utils.OK + return nil +} + +func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { + ps.mux.Lock() + defer ps.mux.Unlock() + subs := ps.subscribers[pi.Event["EventName"]] + for transportAddress, subData := range subs { + split := utils.InfieldSplit(transportAddress) + if len(split) != 2 { + Logger.Warning(" Wrong transport;address pair: " + transportAddress) + continue + } + transport := split[0] + address := split[1] + if !subData.ExpTime.IsZero() && subData.ExpTime.Before(time.Now()) { + delete(subs, transportAddress) + ps.saveSubscribers(pi.Event["EventName"]) + continue // subscription expired, do not send event + } + switch transport { + case utils.META_HTTP_POST: + go func() { + delay := utils.Fib() + for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort + if _, err := ps.pubFunc(address, ps.ttlVerify, pi.Event); err == nil { + break // Success, no need to reinterate + } else if i == 4 { // Last iteration, syslog the warning + Logger.Warning(fmt.Sprintf(" Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), pi.Event["EventName"])) + break + } + time.Sleep(delay()) + } + }() + } + } + *reply = utils.OK + return nil +} + +func (ps *PubSub) ShowSubscribers(in string, out *map[string]map[string]*SubscriberData) error { + *out = ps.subscribers + return nil +} + +type ProxyPubSub struct { + Client *rpcclient.RpcClient +} + +func NewProxyPubSub(addr string, reconnects int) (*ProxyPubSub, error) { + client, err := rpcclient.NewRpcClient("tcp", addr, reconnects, utils.GOB) + if err != nil { + return nil, err + } + return &ProxyPubSub{Client: client}, nil +} + +func (ps *ProxyPubSub) Subscribe(si SubscribeInfo, reply *string) error { + return ps.Client.Call("PubSubV1.Subscribe", si, reply) +} +func (ps *ProxyPubSub) Unsubscribe(si SubscribeInfo, reply *string) error { + return ps.Client.Call("PubSubV1.Unsubscribe", si, reply) +} +func (ps *ProxyPubSub) Publish(pi PublishInfo, reply *string) error { + return ps.Client.Call("PubSubV1.Publish", pi, reply) +} + +func (ps *ProxyPubSub) ShowSubscribers(in string, reply *map[string]map[string]*SubscriberData) error { + return ps.Client.Call("PubSubV1.ShowSubscribers", in, reply) +} diff --git a/engine/pubsub_test.go b/engine/pubsub_test.go new file mode 100644 index 000000000..6e027a7b8 --- /dev/null +++ b/engine/pubsub_test.go @@ -0,0 +1,208 @@ +package engine + +import ( + "testing" + "time" + + "github.com/cgrates/cgrates/utils" +) + +func TestSubscribe(t *testing.T) { + ps := NewPubSub(accountingStorage, false) + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + if subData, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || subData.ExpTime.IsZero() { + t.Error("Error adding subscriber: ", ps.subscribers) + } +} + +func TestSubscribeSave(t *testing.T) { + ps := NewPubSub(accountingStorage, false) + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + subs, err := accountingStorage.GetPubSubSubscribers() + if err != nil || len(subs["test"]) != 1 { + t.Error("Error saving subscribers: ", err, subs) + } +} + +func TestSubscribeNoTransport(t *testing.T) { + ps := NewPubSub(accountingStorage, false) + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventName: "test", + Transport: "test", + Address: "url", + LifeSpan: time.Second, + }, &r); err == nil { + t.Error("Error subscribing error: ", err) + } +} + +func TestSubscribeNoExpire(t *testing.T) { + ps := NewPubSub(accountingStorage, false) + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: 0, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + if subData, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || !subData.ExpTime.IsZero() { + t.Error("Error adding no expire subscriber: ", ps.subscribers) + } +} + +func TestUnsubscribe(t *testing.T) { + ps := NewPubSub(accountingStorage, false) + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + if err := ps.Unsubscribe(SubscribeInfo{ + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + }, &r); err != nil { + t.Error("Error unsubscribing: ", err) + } + if _, exists := ps.subscribers["test"]["url"]; exists { + t.Error("Error adding subscriber: ", ps.subscribers) + } +} + +func TestUnsubscribeSave(t *testing.T) { + ps := NewPubSub(accountingStorage, false) + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + if err := ps.Unsubscribe(SubscribeInfo{ + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + }, &r); err != nil { + t.Error("Error unsubscribing: ", err) + } + subs, err := accountingStorage.GetPubSubSubscribers() + if err != nil || len(subs["test"]) != 0 { + t.Error("Error saving subscribers: ", err, subs) + } +} + +func TestPublish(t *testing.T) { + ps := NewPubSub(accountingStorage, true) + ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { + obj.(map[string]string)["called"] = url + return nil, nil + } + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + m := make(map[string]string) + m["EventName"] = "test" + if err := ps.Publish(PublishInfo{ + Event: m, + }, &r); err != nil { + t.Error("Error publishing: ", err) + } + for i := 0; i < 1000; i++ { // wait for the theread to populate map + if len(m) == 1 { + time.Sleep(time.Microsecond) + } else { + break + } + } + if r, exists := m["called"]; !exists || r != "url" { + t.Error("Error calling publish function: ", m) + } +} + +func TestPublishExpired(t *testing.T) { + ps := NewPubSub(accountingStorage, true) + ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { + m := obj.(map[string]string) + m["called"] = "yes" + return nil, nil + } + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: 1, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + if err := ps.Publish(PublishInfo{ + Event: map[string]string{"EventName": "test"}, + }, &r); err != nil { + t.Error("Error publishing: ", err) + } + if len(ps.subscribers["test"]) != 0 { + t.Error("Error removing expired subscribers: ", ps.subscribers) + } +} + +func TestPublishExpiredSave(t *testing.T) { + ps := NewPubSub(accountingStorage, true) + ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { + m := obj.(map[string]string) + m["called"] = "yes" + return nil, nil + } + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: 1, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + subs, err := accountingStorage.GetPubSubSubscribers() + if err != nil || len(subs["test"]) != 1 { + t.Error("Error saving subscribers: ", err, subs) + } + if err := ps.Publish(PublishInfo{ + Event: map[string]string{"EventName": "test"}, + }, &r); err != nil { + t.Error("Error publishing: ", err) + } + subs, err = accountingStorage.GetPubSubSubscribers() + if err != nil || len(subs["test"]) != 0 { + t.Error("Error saving subscribers: ", err, subs) + } +} diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 894905304..faa8fc560 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -79,6 +79,8 @@ type AccountingStorage interface { SetAccount(*Account) error GetCdrStatsQueue(string) (*StatsQueue, error) SetCdrStatsQueue(*StatsQueue) error + GetPubSubSubscribers() (map[string]map[string]*SubscriberData, error) + SetPubSubSubscribers(string, map[string]*SubscriberData) error } type CdrStorage interface { diff --git a/engine/storage_map.go b/engine/storage_map.go index e350c5ff6..b729bd102 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -561,6 +561,24 @@ func (ms *MapStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { return } +func (ms *MapStorage) GetPubSubSubscribers() (result map[string]map[string]*SubscriberData, err error) { + result = make(map[string]map[string]*SubscriberData) + for key, value := range ms.dict { + if strings.HasPrefix(key, utils.PUBSUB_SUBSCRIBERS_PREFIX) { + subs := make(map[string]*SubscriberData) + if err = ms.ms.Unmarshal(value, &subs); err == nil { + result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs + } + } + } + return +} +func (ms *MapStorage) SetPubSubSubscribers(key string, subs map[string]*SubscriberData) (err error) { + result, err := ms.ms.Marshal(subs) + ms.dict[utils.PUBSUB_SUBSCRIBERS_PREFIX+key] = result + return +} + func (ms *MapStorage) GetActionPlans(key string) (ats ActionPlans, err error) { if values, ok := ms.dict[utils.ACTION_TIMING_PREFIX+key]; ok { err = ms.ms.Unmarshal(values, &ats) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 41bb26b74..c571d7622 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -689,6 +689,30 @@ func (rs *RedisStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { return } +func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]*SubscriberData, err error) { + keys, err := rs.db.Keys(utils.PUBSUB_SUBSCRIBERS_PREFIX + "*") + if err != nil { + return nil, err + } + result = make(map[string]map[string]*SubscriberData) + for _, key := range keys { + if values, err := rs.db.Get(key); err == nil { + subs := make(map[string]*SubscriberData) + err = rs.ms.Unmarshal(values, &subs) + result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs + } else { + return nil, utils.ErrNotFound + } + } + return +} + +func (rs *RedisStorage) SetPubSubSubscribers(key string, subs map[string]*SubscriberData) (err error) { + result, err := rs.ms.Marshal(subs) + rs.db.Set(utils.PUBSUB_SUBSCRIBERS_PREFIX+key, result) + return +} + func (rs *RedisStorage) GetActionPlans(key string) (ats ActionPlans, err error) { var values []byte if values, err = rs.db.Get(utils.ACTION_TIMING_PREFIX + key); err == nil { diff --git a/test.sh b/test.sh index dbaf2189e..48c9fa8d4 100755 --- a/test.sh +++ b/test.sh @@ -38,4 +38,3 @@ go test github.com/cgrates/cgrates/cdre cdre=$? exit $v1 && $v2 && $en && $gt && $sm && $cfg && $bl && $cr && $cdrc && $ut && $hs && $c2g && $cdre - diff --git a/utils/consts.go b/utils/consts.go index 5a11dc0c0..458b426e1 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -165,6 +165,7 @@ const ( LCR_PREFIX = "lcr_" DERIVEDCHARGERS_PREFIX = "dcs_" CDR_STATS_QUEUE_PREFIX = "csq_" + PUBSUB_SUBSCRIBERS_PREFIX = "pss_" CDR_STATS_PREFIX = "cst_" TEMP_DESTINATION_PREFIX = "tmp_" LOG_CALL_COST_PREFIX = "cco_" diff --git a/utils/coreutils.go b/utils/coreutils.go index de3c36194..1788aeab4 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -274,6 +274,14 @@ func AccountAliasKey(tenant, account string) string { return ConcatenatedKey(tenant, account) } +func InfieldJoin(vals ...string) string { + return strings.Join(vals, INFIELD_SEP) +} + +func InfieldSplit(val string) []string { + return strings.Split(val, INFIELD_SEP) +} + func HttpJsonPost(url string, skipTlsVerify bool, content interface{}) ([]byte, error) { body, err := json.Marshal(content) if err != nil {