From e99b3ff16c443b63a21a17be237d7d5f40b26fb7 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 30 Jun 2015 19:09:15 +0300 Subject: [PATCH 01/14] pubsub first draft --- pubsub/pubsub.go | 93 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 pubsub/pubsub.go diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go new file mode 100644 index 000000000..f0f76dd07 --- /dev/null +++ b/pubsub/pubsub.go @@ -0,0 +1,93 @@ +package pubsub + +import ( + "fmt" + "time" + + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +type SubscribeInfo struct { + EventType string + PostUrl string + LiveDuration time.Duartion +} + +type PublishInfo struct { + EventType string + Event map[string]string +} + +type PublishSubscriber interface { + Subscribe(SubscribeInfo, *string) error + Unsubscribe(SubscribeInfo, *string) error + Publish(PublishInfo, *string) error +} + +type PubSub struct { + subscribers map[string]map[string]time.Time + conf *CGRConfig +} + +func NewPubSub(conf *CGRConfig) *PubSub { + return &PubSub{ + conf: conf, + subscribers: make(map[string]map[string]time.Time), + } +} + +func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error { + if ps.subscribers[si.EventType] == nil { + ps.subscribers[si.EventType] = make(map[string]time.Time) + } + ps.subscribers[si.EventType][si.PostUrl] = time.Now().Add(si.LiveDuration) + *reply = utils.OK + return nil +} + +func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error { + delete(ps.subscribers[si.EventType], si.PostUrl) + *reply = utils.OK + return nil +} + +func (ps *PubSub) Publish(pi PublishInfo, replay *string) error { + subs := ps.subscribers[pi.EventType] + for postURL, expTime := range subs { + if expTime.After(time.Now) { + delete(subs, postURL) + continue // subscription expired, do not send event + } + url := postURL + go func() { + delay := utils.Fib() + for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort + if _, err = utils.HttpJsonPost(url, ps.cfg.HttpSkipTlsVerify, pi.Event); err == nil { + break // Success, no need to reinterate + } else if i == 4 { // Last iteration, syslog the warning + Logger.Warning(fmt.Sprintf(" WARNING: Failed calling url: [%s], error: [%s], event type: %s", url, err.Error(), pi.EventType)) + break + } + time.Sleep(delay()) + } + }() + } + return nil +} + +type ProxyPubSub struct { + Client *rpcclient.RpcClient +} + +func NewProxyPubSub(addr string, reconnects int) (*ProxyPubSub, error) { + client, err := rpcclient.NewRpcClient("tcp", addr, reconnects, utils.GOB) + if err != nil { + return nil, err + } + return &ProxyPubSub{Client: client}, nil +} + +func (ps *ProxyPubSub) Subscribe(sqID string, values *map[string]float64) error { + return ps.Client.Call("PubSub.Subscribe", sqID, values) +} From 173b025d8f690e81aa552ea6037536a410a154b0 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 30 Jun 2015 22:54:06 +0300 Subject: [PATCH 02/14] first pubsub tests --- pubsub/pubsub.go | 25 ++++++--- pubsub/pubsub_test.go | 118 ++++++++++++++++++++++++++++++++++++++++++ test.sh | 6 ++- 3 files changed, 139 insertions(+), 10 deletions(-) create mode 100644 pubsub/pubsub_test.go diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index f0f76dd07..1b57334df 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -4,6 +4,8 @@ import ( "fmt" "time" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) @@ -11,7 +13,7 @@ import ( type SubscribeInfo struct { EventType string PostUrl string - LiveDuration time.Duartion + LiveDuration time.Duration } type PublishInfo struct { @@ -27,13 +29,15 @@ type PublishSubscriber interface { type PubSub struct { subscribers map[string]map[string]time.Time - conf *CGRConfig + conf *config.CGRConfig + pubFunc func(string, bool, interface{}) ([]byte, error) } -func NewPubSub(conf *CGRConfig) *PubSub { +func NewPubSub(conf *config.CGRConfig) *PubSub { return &PubSub{ conf: conf, subscribers: make(map[string]map[string]time.Time), + pubFunc: utils.HttpJsonPost, } } @@ -41,7 +45,11 @@ func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error { if ps.subscribers[si.EventType] == nil { ps.subscribers[si.EventType] = make(map[string]time.Time) } - ps.subscribers[si.EventType][si.PostUrl] = time.Now().Add(si.LiveDuration) + var expTime time.Time + if si.LiveDuration > 0 { + expTime = time.Now().Add(si.LiveDuration) + } + ps.subscribers[si.EventType][si.PostUrl] = expTime *reply = utils.OK return nil } @@ -52,10 +60,10 @@ func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error { return nil } -func (ps *PubSub) Publish(pi PublishInfo, replay *string) error { +func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { subs := ps.subscribers[pi.EventType] for postURL, expTime := range subs { - if expTime.After(time.Now) { + if !expTime.IsZero() && expTime.Before(time.Now()) { delete(subs, postURL) continue // subscription expired, do not send event } @@ -63,16 +71,17 @@ func (ps *PubSub) Publish(pi PublishInfo, replay *string) error { go func() { delay := utils.Fib() for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort - if _, err = utils.HttpJsonPost(url, ps.cfg.HttpSkipTlsVerify, pi.Event); err == nil { + if _, err := ps.pubFunc(url, ps.conf.HttpSkipTlsVerify, pi.Event); err == nil { break // Success, no need to reinterate } else if i == 4 { // Last iteration, syslog the warning - Logger.Warning(fmt.Sprintf(" WARNING: Failed calling url: [%s], error: [%s], event type: %s", url, err.Error(), pi.EventType)) + engine.Logger.Warning(fmt.Sprintf(" WARNING: Failed calling url: [%s], error: [%s], event type: %s", url, err.Error(), pi.EventType)) break } time.Sleep(delay()) } }() } + *reply = utils.OK return nil } diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go new file mode 100644 index 000000000..681a2d105 --- /dev/null +++ b/pubsub/pubsub_test.go @@ -0,0 +1,118 @@ +package pubsub + +import ( + "testing" + "time" + + "github.com/cgrates/cgrates/config" +) + +func TestSubscribe(t *testing.T) { + ps := NewPubSub(nil) + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventType: "test", + PostUrl: "url", + LiveDuration: time.Second, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + if expTime, exists := ps.subscribers["test"]["url"]; !exists || expTime.IsZero() { + t.Error("Error adding subscriber: ", ps.subscribers) + } +} + +func TestSubscribeNoExpire(t *testing.T) { + ps := NewPubSub(nil) + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventType: "test", + PostUrl: "url", + LiveDuration: 0, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + if expTime, exists := ps.subscribers["test"]["url"]; !exists || !expTime.IsZero() { + t.Error("Error adding no expire subscriber: ", ps.subscribers) + } +} + +func TestUnsubscribe(t *testing.T) { + ps := NewPubSub(nil) + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventType: "test", + PostUrl: "url", + LiveDuration: time.Second, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + if err := ps.Unsubscribe(SubscribeInfo{ + EventType: "test", + PostUrl: "url", + }, &r); err != nil { + t.Error("Error unsubscribing: ", err) + } + if _, exists := ps.subscribers["test"]["url"]; exists { + t.Error("Error adding subscriber: ", ps.subscribers) + } +} + +func TestPublish(t *testing.T) { + ps := NewPubSub(&config.CGRConfig{HttpSkipTlsVerify: true}) + ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { + obj.(map[string]string)["called"] = "yes" + return nil, nil + } + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventType: "test", + PostUrl: "url", + LiveDuration: time.Second, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + m := make(map[string]string) + if err := ps.Publish(PublishInfo{ + EventType: "test", + Event: m, + }, &r); err != nil { + t.Error("Error publishing: ", err) + } + for i := 0; i < 1000; i++ { // wait for the theread to populate map + if len(m) == 0 { + time.Sleep(time.Microsecond) + } else { + break + } + } + if r, exists := m["called"]; !exists || r != "yes" { + t.Error("Error calling publish function: ", m) + } +} + +func TestPublishExpired(t *testing.T) { + ps := NewPubSub(&config.CGRConfig{HttpSkipTlsVerify: true}) + ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { + m := obj.(map[string]string) + m["called"] = "yes" + return nil, nil + } + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventType: "test", + PostUrl: "url", + LiveDuration: 1, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + if err := ps.Publish(PublishInfo{ + EventType: "test", + Event: nil, + }, &r); err != nil { + t.Error("Error publishing: ", err) + } + if len(ps.subscribers["test"]) != 0 { + t.Error("Error removing expired subscribers: ", ps.subscribers) + } +} diff --git a/test.sh b/test.sh index dbaf2189e..7c605b910 100755 --- a/test.sh +++ b/test.sh @@ -11,6 +11,7 @@ go test -i github.com/cgrates/cgrates/cdrc go test -i github.com/cgrates/cgrates/utils go test -i github.com/cgrates/cgrates/history go test -i github.com/cgrates/cgrates/cdre +go test -i github.com/cgrates/cgrates/pubsub go test github.com/cgrates/cgrates/apier/v1 v1=$? @@ -36,6 +37,7 @@ go test github.com/cgrates/cgrates/cache2go c2g=$? go test github.com/cgrates/cgrates/cdre cdre=$? +go test github.com/cgrates/cgrates/pubsub +ps=$? -exit $v1 && $v2 && $en && $gt && $sm && $cfg && $bl && $cr && $cdrc && $ut && $hs && $c2g && $cdre - +exit $v1 && $v2 && $en && $gt && $sm && $cfg && $bl && $cr && $cdrc && $ut && $hs && $c2g && $cdre && $ps From 66ac4b194c3df5a0c994bdba1b21c30db5e47fa5 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 1 Jul 2015 12:03:15 +0300 Subject: [PATCH 03/14] added transport and sync locks --- pubsub/pubsub.go | 92 +++++++++++++++++++++++++++++-------------- pubsub/pubsub_test.go | 71 +++++++++++++++++++++------------ utils/coreutils.go | 8 ++++ 3 files changed, 116 insertions(+), 55 deletions(-) diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 1b57334df..ce4b05886 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -1,7 +1,9 @@ package pubsub import ( + "errors" "fmt" + "sync" "time" "github.com/cgrates/cgrates/config" @@ -11,17 +13,18 @@ import ( ) type SubscribeInfo struct { - EventType string - PostUrl string - LiveDuration time.Duration + EventName string + EventFilter string + Transport string + Address string + LifeSpan time.Duration } type PublishInfo struct { - EventType string - Event map[string]string + Event map[string]string } -type PublishSubscriber interface { +type PublisherSubscriber interface { Subscribe(SubscribeInfo, *string) error Unsubscribe(SubscribeInfo, *string) error Publish(PublishInfo, *string) error @@ -31,6 +34,7 @@ type PubSub struct { subscribers map[string]map[string]time.Time conf *config.CGRConfig pubFunc func(string, bool, interface{}) ([]byte, error) + mux *sync.Mutex } func NewPubSub(conf *config.CGRConfig) *PubSub { @@ -38,48 +42,72 @@ func NewPubSub(conf *config.CGRConfig) *PubSub { conf: conf, subscribers: make(map[string]map[string]time.Time), pubFunc: utils.HttpJsonPost, + mux: &sync.Mutex{}, } } func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error { - if ps.subscribers[si.EventType] == nil { - ps.subscribers[si.EventType] = make(map[string]time.Time) + ps.mux.Lock() + defer ps.mux.Unlock() + if si.Transport != utils.META_HTTP_POST { + *reply = "Unsupported transport type" + return errors.New(*reply) + } + if ps.subscribers[si.EventName] == nil { + ps.subscribers[si.EventName] = make(map[string]time.Time) } var expTime time.Time - if si.LiveDuration > 0 { - expTime = time.Now().Add(si.LiveDuration) + if si.LifeSpan > 0 { + expTime = time.Now().Add(si.LifeSpan) } - ps.subscribers[si.EventType][si.PostUrl] = expTime + ps.subscribers[si.EventName][utils.InfieldJoin(si.Transport, si.Address)] = expTime *reply = utils.OK return nil } func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error { - delete(ps.subscribers[si.EventType], si.PostUrl) + ps.mux.Lock() + defer ps.mux.Unlock() + if si.Transport != utils.META_HTTP_POST { + *reply = "Unsupported transport type" + return errors.New(*reply) + } + delete(ps.subscribers[si.EventName], utils.InfieldJoin(si.Transport, si.Address)) *reply = utils.OK return nil } func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { - subs := ps.subscribers[pi.EventType] - for postURL, expTime := range subs { + ps.mux.Lock() + defer ps.mux.Unlock() + subs := ps.subscribers[pi.Event["EventName"]] + for transport_address, expTime := range subs { + split := utils.InfieldSplit(transport_address) + if len(split) != 2 { + engine.Logger.Warning(" Wrong transport;address pair: " + transport_address) + continue + } + transport := split[0] + address := split[1] if !expTime.IsZero() && expTime.Before(time.Now()) { - delete(subs, postURL) + delete(subs, transport_address) continue // subscription expired, do not send event } - url := postURL - go func() { - delay := utils.Fib() - for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort - if _, err := ps.pubFunc(url, ps.conf.HttpSkipTlsVerify, pi.Event); err == nil { - break // Success, no need to reinterate - } else if i == 4 { // Last iteration, syslog the warning - engine.Logger.Warning(fmt.Sprintf(" WARNING: Failed calling url: [%s], error: [%s], event type: %s", url, err.Error(), pi.EventType)) - break + switch transport { + case utils.META_HTTP_POST: + go func() { + delay := utils.Fib() + for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort + if _, err := ps.pubFunc(address, ps.conf.HttpSkipTlsVerify, pi.Event); err == nil { + break // Success, no need to reinterate + } else if i == 4 { // Last iteration, syslog the warning + engine.Logger.Warning(fmt.Sprintf(" Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), pi.Event["EventName"])) + break + } + time.Sleep(delay()) } - time.Sleep(delay()) - } - }() + }() + } } *reply = utils.OK return nil @@ -97,6 +125,12 @@ func NewProxyPubSub(addr string, reconnects int) (*ProxyPubSub, error) { return &ProxyPubSub{Client: client}, nil } -func (ps *ProxyPubSub) Subscribe(sqID string, values *map[string]float64) error { - return ps.Client.Call("PubSub.Subscribe", sqID, values) +func (ps *ProxyPubSub) Subscribe(si SubscribeInfo, reply *string) error { + return ps.Client.Call("PubSub.Subscribe", si, reply) +} +func (ps *ProxyPubSub) Unsubscribe(si SubscribeInfo, reply *string) error { + return ps.Client.Call("PubSub.Unsubscribe", si, reply) +} +func (ps *ProxyPubSub) Publish(pi PublishInfo, reply *string) error { + return ps.Client.Call("PubSub.Publish", pi, reply) } diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 681a2d105..74c964dba 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -5,34 +5,50 @@ import ( "time" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" ) func TestSubscribe(t *testing.T) { ps := NewPubSub(nil) var r string if err := ps.Subscribe(SubscribeInfo{ - EventType: "test", - PostUrl: "url", - LiveDuration: time.Second, + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, }, &r); err != nil { t.Error("Error subscribing: ", err) } - if expTime, exists := ps.subscribers["test"]["url"]; !exists || expTime.IsZero() { + if expTime, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || expTime.IsZero() { t.Error("Error adding subscriber: ", ps.subscribers) } } +func TestSubscribeNoTransport(t *testing.T) { + ps := NewPubSub(nil) + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventName: "test", + Transport: "test", + Address: "url", + LifeSpan: time.Second, + }, &r); err == nil { + t.Error("Error subscribing error: ", err) + } +} + func TestSubscribeNoExpire(t *testing.T) { ps := NewPubSub(nil) var r string if err := ps.Subscribe(SubscribeInfo{ - EventType: "test", - PostUrl: "url", - LiveDuration: 0, + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: 0, }, &r); err != nil { t.Error("Error subscribing: ", err) } - if expTime, exists := ps.subscribers["test"]["url"]; !exists || !expTime.IsZero() { + if expTime, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || !expTime.IsZero() { t.Error("Error adding no expire subscriber: ", ps.subscribers) } } @@ -41,15 +57,17 @@ func TestUnsubscribe(t *testing.T) { ps := NewPubSub(nil) var r string if err := ps.Subscribe(SubscribeInfo{ - EventType: "test", - PostUrl: "url", - LiveDuration: time.Second, + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, }, &r); err != nil { t.Error("Error subscribing: ", err) } if err := ps.Unsubscribe(SubscribeInfo{ - EventType: "test", - PostUrl: "url", + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", }, &r); err != nil { t.Error("Error unsubscribing: ", err) } @@ -61,32 +79,33 @@ func TestUnsubscribe(t *testing.T) { func TestPublish(t *testing.T) { ps := NewPubSub(&config.CGRConfig{HttpSkipTlsVerify: true}) ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { - obj.(map[string]string)["called"] = "yes" + obj.(map[string]string)["called"] = url return nil, nil } var r string if err := ps.Subscribe(SubscribeInfo{ - EventType: "test", - PostUrl: "url", - LiveDuration: time.Second, + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, }, &r); err != nil { t.Error("Error subscribing: ", err) } m := make(map[string]string) + m["EventName"] = "test" if err := ps.Publish(PublishInfo{ - EventType: "test", - Event: m, + Event: m, }, &r); err != nil { t.Error("Error publishing: ", err) } for i := 0; i < 1000; i++ { // wait for the theread to populate map - if len(m) == 0 { + if len(m) == 1 { time.Sleep(time.Microsecond) } else { break } } - if r, exists := m["called"]; !exists || r != "yes" { + if r, exists := m["called"]; !exists || r != "url" { t.Error("Error calling publish function: ", m) } } @@ -100,15 +119,15 @@ func TestPublishExpired(t *testing.T) { } var r string if err := ps.Subscribe(SubscribeInfo{ - EventType: "test", - PostUrl: "url", - LiveDuration: 1, + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: 1, }, &r); err != nil { t.Error("Error subscribing: ", err) } if err := ps.Publish(PublishInfo{ - EventType: "test", - Event: nil, + Event: map[string]string{"EventName": "test"}, }, &r); err != nil { t.Error("Error publishing: ", err) } diff --git a/utils/coreutils.go b/utils/coreutils.go index de3c36194..1788aeab4 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -274,6 +274,14 @@ func AccountAliasKey(tenant, account string) string { return ConcatenatedKey(tenant, account) } +func InfieldJoin(vals ...string) string { + return strings.Join(vals, INFIELD_SEP) +} + +func InfieldSplit(val string) []string { + return strings.Split(val, INFIELD_SEP) +} + func HttpJsonPost(url string, skipTlsVerify bool, content interface{}) ([]byte, error) { body, err := json.Marshal(content) if err != nil { From a3affa32b1b2a73048fe71882c14f3a87ce0edf8 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 1 Jul 2015 16:47:54 +0300 Subject: [PATCH 04/14] json rpc for cgr tester --- cmd/cgr-tester/cgr-tester.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go index 174d50c01..1c3999a3d 100644 --- a/cmd/cgr-tester/cgr-tester.go +++ b/cmd/cgr-tester/cgr-tester.go @@ -23,6 +23,7 @@ import ( "fmt" "log" "net/rpc" + "net/rpc/jsonrpc" "os" "runtime" "runtime/pprof" @@ -58,6 +59,7 @@ var ( tenant = flag.String("tenant", "cgrates.org", "The type of record to use in queries.") subject = flag.String("subject", "1001", "The rating subject to use in queries.") destination = flag.String("destination", "1002", "The destination to use in queries.") + json = flag.Bool("json", false, "Use JSON RPC") nilDuration = time.Duration(0) ) @@ -107,7 +109,14 @@ func durInternalRater(cd *engine.CallDescriptor) (time.Duration, error) { func durRemoteRater(cd *engine.CallDescriptor) (time.Duration, error) { result := engine.CallCost{} - client, err := rpc.Dial("tcp", *raterAddress) + var client *rpc.Client + var err error + if *json { + client, err = jsonrpc.Dial("tcp", *raterAddress) + } else { + client, err = rpc.Dial("tcp", *raterAddress) + } + if err != nil { return nilDuration, fmt.Errorf("Could not connect to engine: %s", err.Error()) } From 02dc95d815b448503aa73d8573ab4f93602816f7 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 1 Jul 2015 19:25:47 +0300 Subject: [PATCH 05/14] added pubsub configs --- cmd/cgr-engine/cgr-engine.go | 54 ++++++++++++++++++++++++++++++++++ config/config.go | 40 ++++++++++++++++++++++++- config/config_defaults.go | 11 +++++++ config/config_json.go | 29 +++++++++++++++++- config/config_json_test.go | 29 ++++++++++++++++++ config/libconfig_json.go | 12 ++++++++ data/conf/cgrates/cgrates.json | 10 +++++++ engine/calldesc.go | 9 ++++-- pubsub/pubsub.go | 15 +++------- pubsub/pubsub_test.go | 13 ++++---- 10 files changed, 200 insertions(+), 22 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index d18d538b1..e5b9fae17 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -35,6 +35,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/history" + "github.com/cgrates/cgrates/pubsub" "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/sessionmanager" "github.com/cgrates/cgrates/utils" @@ -67,6 +68,7 @@ var ( exitChan = make(chan bool) server = &engine.Server{} scribeServer history.Scribe + pubSubServer pubsub.PublisherSubscriber cdrServer *engine.CdrServer cdrStats *engine.Stats cfg *config.CGRConfig @@ -363,6 +365,46 @@ func startHistoryAgent(chanServerStarted chan struct{}) { return } +func startPubSubServer(chanDone chan struct{}) { + if pubSubServer = pubsub.NewPubSub(cfg.HttpSkipTlsVerify); err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) + exitChan <- true + return + } + server.RpcRegisterName("PubSub", pubSubServer) + close(chanDone) +} + +// chanStartServer will report when server is up, useful for internal requests +func startPubSubAgent(chanServerStarted chan struct{}) { + if cfg.PubSubServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting + engine.Logger.Crit(fmt.Sprintf(" Connecting internally to PubSubServer")) + select { + case <-time.After(1 * time.Minute): + engine.Logger.Crit(fmt.Sprintf(" Timeout waiting for server to start.")) + exitChan <- true + return + case <-chanServerStarted: + } + //<-chanServerStarted // If server is not enabled, will have deadlock here + } else { // Connect in iteration since there are chances of concurrency here + delay := utils.Fib() + for i := 0; i < 3; i++ { //ToDo: Make it globally configurable + //engine.Logger.Crit(fmt.Sprintf(" Trying to connect, iteration: %d, time %s", i, time.Now())) + if pubSubServer = pubsub.NewPubSub(cfg.HttpSkipTlsVerify); err == nil { + break //Connected so no need to reiterate + } else if i == 2 && err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not connect to the server, error: %s", err.Error())) + exitChan <- true + return + } + time.Sleep(delay()) + } + } + engine.SetPubSub(pubSubServer) // scribeServer comes from global variable + return +} + // Starts the rpc server, waiting for the necessary components to finish their tasks func serveRpc(rpcWaitChans []chan struct{}) { for _, chn := range rpcWaitChans { @@ -579,6 +621,18 @@ func main() { go startHistoryAgent(histServChan) } + var pubsubServChan chan struct{} // Will be initialized only if the server starts + if cfg.PubSubServerEnabled { + pubsubServChan = make(chan struct{}) + rpcWait = append(rpcWait, pubsubServChan) + go startPubSubServer(pubsubServChan) + } + + if cfg.PubSubAgentEnabled { + engine.Logger.Info("Starting CGRateS PubSub Agent.") + go startPubSubAgent(pubsubServChan) + } + var cdrsChan chan struct{} if cfg.CDRSEnabled { engine.Logger.Info("Starting CGRateS CDRS service.") diff --git a/config/config.go b/config/config.go index 989b0f655..b7226fa61 100644 --- a/config/config.go +++ b/config/config.go @@ -213,7 +213,11 @@ type CGRConfig struct { HistoryServer string // Address where to reach the master history server: HistoryServerEnabled bool // Starts History as server: . HistoryDir string // Location on disk where to store history files. - HistorySaveInterval time.Duration // The timout duration between history writes + HistorySaveInterval time.Duration // The timout duration between pubsub writes + PubSubAgentEnabled bool // Starts PubSub as an agent: . + PubSubServer string // Address where to reach the master pubsub server: + PubSubServerEnabled bool // Starts PubSub as server: . + PubSubSaveInterval time.Duration // The timout duration between pubsub writes MailerServer string // The server to use when sending emails out MailerAuthUser string // Authenticate to email server using this user MailerAuthPass string // Authenticate to email server with this password @@ -319,6 +323,10 @@ func (self *CGRConfig) checkConfigSanity() error { if self.HistoryAgentEnabled && !self.HistoryServerEnabled { return errors.New("HistoryServer not enabled but referenced by HistoryAgent component") } + // PubSubAgent + if self.PubSubAgentEnabled && !self.PubSubServerEnabled { + return errors.New("PubSubServer not enabled but referenced by PubSubAgent component") + } return nil } @@ -411,6 +419,16 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { return err } + jsnPubSubServCfg, err := jsnCfg.PubSubServJsonCfg() + if err != nil { + return err + } + + jsnPubSubAgentCfg, err := jsnCfg.PubSubAgentJsonCfg() + if err != nil { + return err + } + jsnMailerCfg, err := jsnCfg.MailerJsonCfg() if err != nil { return err @@ -678,6 +696,26 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { } } + if jsnPubSubAgentCfg != nil { + if jsnPubSubAgentCfg.Enabled != nil { + self.PubSubAgentEnabled = *jsnPubSubAgentCfg.Enabled + } + if jsnPubSubAgentCfg.Server != nil { + self.PubSubServer = *jsnPubSubAgentCfg.Server + } + } + + if jsnPubSubServCfg != nil { + if jsnPubSubServCfg.Enabled != nil { + self.PubSubServerEnabled = *jsnPubSubServCfg.Enabled + } + if jsnPubSubServCfg.Save_interval != nil { + if self.PubSubSaveInterval, err = utils.ParseDurationWithSecs(*jsnPubSubServCfg.Save_interval); err != nil { + return err + } + } + } + if jsnMailerCfg != nil { if jsnMailerCfg.Server != nil { self.MailerServer = *jsnMailerCfg.Server diff --git a/config/config_defaults.go b/config/config_defaults.go index 5d59cf5d8..786f57250 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -243,6 +243,17 @@ const CGRATES_CFG_JSON = ` "server": "internal", // address where to reach the master history server: }, +"pubsub_server": { + "enabled": false, // starts History service: . + "save_interval": "1s", // interval to save changed cache into .git archive +}, + + +"pubsub_agent": { + "enabled": false, // starts PubSub as a client: . + "server": "internal", // address where to reach the master pubsub server: +}, + "mailer": { "server": "localhost", // the server to use when sending emails out diff --git a/config/config_json.go b/config/config_json.go index 6ccb5caea..7ef0ca3bd 100644 --- a/config/config_json.go +++ b/config/config_json.go @@ -20,9 +20,10 @@ package config import ( "encoding/json" - "github.com/DisposaBoy/JsonConfigReader" "io" "os" + + "github.com/DisposaBoy/JsonConfigReader" ) const ( @@ -48,6 +49,8 @@ const ( OSIPS_JSN = "opensips" HISTSERV_JSN = "history_server" HISTAGENT_JSN = "history_agent" + PUBSUBSERV_JSN = "pubsub_server" + PUBSUBAGENT_JSN = "pubsub_agent" MAILER_JSN = "mailer" ) @@ -254,6 +257,30 @@ func (self CgrJsonCfg) HistAgentJsonCfg() (*HistAgentJsonCfg, error) { return cfg, nil } +func (self CgrJsonCfg) PubSubServJsonCfg() (*PubSubServJsonCfg, error) { + rawCfg, hasKey := self[HISTSERV_JSN] + if !hasKey { + return nil, nil + } + cfg := new(PubSubServJsonCfg) + if err := json.Unmarshal(*rawCfg, cfg); err != nil { + return nil, err + } + return cfg, nil +} + +func (self CgrJsonCfg) PubSubAgentJsonCfg() (*PubSubAgentJsonCfg, error) { + rawCfg, hasKey := self[HISTAGENT_JSN] + if !hasKey { + return nil, nil + } + cfg := new(PubSubAgentJsonCfg) + if err := json.Unmarshal(*rawCfg, cfg); err != nil { + return nil, err + } + return cfg, nil +} + func (self CgrJsonCfg) MailerJsonCfg() (*MailerJsonCfg, error) { rawCfg, hasKey := self[MAILER_JSN] if !hasKey { diff --git a/config/config_json_test.go b/config/config_json_test.go index 8faa57124..cde9a43d5 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -406,6 +406,30 @@ func TestDfHistAgentJsonCfg(t *testing.T) { } } +func TestDfPubSubServJsonCfg(t *testing.T) { + eCfg := &PubSubServJsonCfg{ + Enabled: utils.BoolPointer(false), + Save_interval: utils.StringPointer("1s"), + } + if cfg, err := dfCgrJsonCfg.PubSubServJsonCfg(); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eCfg, cfg) { + t.Error("Received: ", cfg) + } +} + +func TestDfPubSubAgentJsonCfg(t *testing.T) { + eCfg := &PubSubAgentJsonCfg{ + Enabled: utils.BoolPointer(false), + Server: utils.StringPointer("internal"), + } + if cfg, err := dfCgrJsonCfg.PubSubAgentJsonCfg(); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eCfg, cfg) { + t.Error("Received: ", cfg) + } +} + func TestDfMailerJsonCfg(t *testing.T) { eCfg := &MailerJsonCfg{ Server: utils.StringPointer("localhost"), @@ -463,6 +487,11 @@ func TestNewCgrJsonCfgFromFile(t *testing.T) { } else if cfg != nil { t.Error("Received: ", cfg) } + if cfg, err := cgrJsonCfg.PubSubAgentJsonCfg(); err != nil { + t.Error(err) + } else if cfg != nil { + t.Error("Received: ", cfg) + } eCfgSmFs := &SmFsJsonCfg{ Enabled: utils.BoolPointer(true), Connections: &[]*FsConnJsonCfg{ diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 4df2fe490..cb1a37c80 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -217,6 +217,18 @@ type HistAgentJsonCfg struct { Server *string } +// PubSub server config section +type PubSubServJsonCfg struct { + Enabled *bool + Save_interval *string +} + +// PubSub agent config section +type PubSubAgentJsonCfg struct { + Enabled *bool + Server *string +} + // Mailer config section type MailerJsonCfg struct { Server *string diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 9fab1711f..905f73855 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -222,6 +222,16 @@ // "server": "internal", // address where to reach the master history server: //}, +//"pubsub_server": { +// "enabled": false, // starts pubsub service: . +// "save_interval": "1s", // interval to save subscribers +//}, + + +//"pubsub_agent": { +// "enabled": false, // starts pubsub as a client: . +// "server": "internal", // address where to reach the master pubsub server: +//}, //"mailer": { // "server": "localhost", // the server to use when sending emails out diff --git a/engine/calldesc.go b/engine/calldesc.go index 01ba1da85..65ecfdd58 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -21,15 +21,15 @@ package engine import ( "errors" "fmt" - //"log" "log/syslog" "sort" "strings" "time" - //"encoding/json" + "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/history" + "github.com/cgrates/cgrates/pubsub" "github.com/cgrates/cgrates/utils" ) @@ -68,6 +68,7 @@ var ( debitPeriod = 10 * time.Second globalRoundingDecimals = 10 historyScribe history.Scribe + pubSubServer pubsub.PublisherSubscriber //historyScribe, _ = history.NewMockScribe() ) @@ -104,6 +105,10 @@ func SetHistoryScribe(scribe history.Scribe) { historyScribe = scribe } +func SetPubSub(ps pubsub.PublisherSubscriber) { + pubSubServer = ps +} + /* The input stucture that contains call information. */ diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index ce4b05886..28f890517 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -2,12 +2,9 @@ package pubsub import ( "errors" - "fmt" "sync" "time" - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) @@ -32,14 +29,14 @@ type PublisherSubscriber interface { type PubSub struct { subscribers map[string]map[string]time.Time - conf *config.CGRConfig + ttlVerify bool pubFunc func(string, bool, interface{}) ([]byte, error) mux *sync.Mutex } -func NewPubSub(conf *config.CGRConfig) *PubSub { +func NewPubSub(ttlVerify bool) *PubSub { return &PubSub{ - conf: conf, + ttlVerify: ttlVerify, subscribers: make(map[string]map[string]time.Time), pubFunc: utils.HttpJsonPost, mux: &sync.Mutex{}, @@ -84,7 +81,6 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { for transport_address, expTime := range subs { split := utils.InfieldSplit(transport_address) if len(split) != 2 { - engine.Logger.Warning(" Wrong transport;address pair: " + transport_address) continue } transport := split[0] @@ -98,11 +94,8 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { go func() { delay := utils.Fib() for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort - if _, err := ps.pubFunc(address, ps.conf.HttpSkipTlsVerify, pi.Event); err == nil { + if _, err := ps.pubFunc(address, ps.ttlVerify, pi.Event); err == nil { break // Success, no need to reinterate - } else if i == 4 { // Last iteration, syslog the warning - engine.Logger.Warning(fmt.Sprintf(" Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), pi.Event["EventName"])) - break } time.Sleep(delay()) } diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 74c964dba..71a94fe64 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -4,12 +4,11 @@ import ( "testing" "time" - "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) func TestSubscribe(t *testing.T) { - ps := NewPubSub(nil) + ps := NewPubSub(false) var r string if err := ps.Subscribe(SubscribeInfo{ EventName: "test", @@ -25,7 +24,7 @@ func TestSubscribe(t *testing.T) { } func TestSubscribeNoTransport(t *testing.T) { - ps := NewPubSub(nil) + ps := NewPubSub(false) var r string if err := ps.Subscribe(SubscribeInfo{ EventName: "test", @@ -38,7 +37,7 @@ func TestSubscribeNoTransport(t *testing.T) { } func TestSubscribeNoExpire(t *testing.T) { - ps := NewPubSub(nil) + ps := NewPubSub(false) var r string if err := ps.Subscribe(SubscribeInfo{ EventName: "test", @@ -54,7 +53,7 @@ func TestSubscribeNoExpire(t *testing.T) { } func TestUnsubscribe(t *testing.T) { - ps := NewPubSub(nil) + ps := NewPubSub(false) var r string if err := ps.Subscribe(SubscribeInfo{ EventName: "test", @@ -77,7 +76,7 @@ func TestUnsubscribe(t *testing.T) { } func TestPublish(t *testing.T) { - ps := NewPubSub(&config.CGRConfig{HttpSkipTlsVerify: true}) + ps := NewPubSub(true) ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { obj.(map[string]string)["called"] = url return nil, nil @@ -111,7 +110,7 @@ func TestPublish(t *testing.T) { } func TestPublishExpired(t *testing.T) { - ps := NewPubSub(&config.CGRConfig{HttpSkipTlsVerify: true}) + ps := NewPubSub(true) ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { m := obj.(map[string]string) m["called"] = "yes" From e7f0b617ec2c4e0b8f87925c359fff8ad1687bc4 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 1 Jul 2015 20:34:32 +0300 Subject: [PATCH 06/14] first successful live test --- cmd/cgr-engine/cgr-engine.go | 2 +- config/config_json.go | 4 ++-- data/conf/samples/cgradmin/cgradmin.json | 11 +++++++++++ 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index e5b9fae17..c46775254 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -371,7 +371,7 @@ func startPubSubServer(chanDone chan struct{}) { exitChan <- true return } - server.RpcRegisterName("PubSub", pubSubServer) + server.RpcRegisterName("PubSubV1", pubSubServer) close(chanDone) } diff --git a/config/config_json.go b/config/config_json.go index 7ef0ca3bd..213ff65c2 100644 --- a/config/config_json.go +++ b/config/config_json.go @@ -258,7 +258,7 @@ func (self CgrJsonCfg) HistAgentJsonCfg() (*HistAgentJsonCfg, error) { } func (self CgrJsonCfg) PubSubServJsonCfg() (*PubSubServJsonCfg, error) { - rawCfg, hasKey := self[HISTSERV_JSN] + rawCfg, hasKey := self[PUBSUBSERV_JSN] if !hasKey { return nil, nil } @@ -270,7 +270,7 @@ func (self CgrJsonCfg) PubSubServJsonCfg() (*PubSubServJsonCfg, error) { } func (self CgrJsonCfg) PubSubAgentJsonCfg() (*PubSubAgentJsonCfg, error) { - rawCfg, hasKey := self[HISTAGENT_JSN] + rawCfg, hasKey := self[PUBSUBAGENT_JSN] if !hasKey { return nil, nil } diff --git a/data/conf/samples/cgradmin/cgradmin.json b/data/conf/samples/cgradmin/cgradmin.json index 46f6fae5d..f8fcb0475 100644 --- a/data/conf/samples/cgradmin/cgradmin.json +++ b/data/conf/samples/cgradmin/cgradmin.json @@ -17,4 +17,15 @@ "scheduler": { "enabled": true, // start Scheduler service: }, + +"pubsub_server": { + "enabled": true, // starts pubsub service: . + "save_interval": "1m", // interval to save subscribers +}, + + +"pubsub_agent": { + "enabled": true, // starts pubsub as a client: . + "server": "internal", // address where to reach the master pubsub server: +}, } From 1fb051ef7a4ff60b5dda197f8656c941f88243b4 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 1 Jul 2015 22:12:22 +0300 Subject: [PATCH 07/14] refactoring and saving subscribers --- cmd/cgr-engine/cgr-engine.go | 18 ++++++------- cmd/cgr-engine/registration.go | 4 ++- engine/calldesc.go | 5 ++-- {pubsub => engine}/pubsub.go | 44 ++++++++++++++++++++++++++----- {pubsub => engine}/pubsub_test.go | 14 +++++----- engine/storage_interface.go | 3 +++ engine/storage_map.go | 18 +++++++++++++ engine/storage_redis.go | 27 +++++++++++++++++++ utils/consts.go | 1 + 9 files changed, 107 insertions(+), 27 deletions(-) rename {pubsub => engine}/pubsub.go (69%) rename {pubsub => engine}/pubsub_test.go (92%) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index c46775254..693fdf5b5 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -35,7 +35,6 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/history" - "github.com/cgrates/cgrates/pubsub" "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/sessionmanager" "github.com/cgrates/cgrates/utils" @@ -68,7 +67,7 @@ var ( exitChan = make(chan bool) server = &engine.Server{} scribeServer history.Scribe - pubSubServer pubsub.PublisherSubscriber + pubSubServer engine.PublisherSubscriber cdrServer *engine.CdrServer cdrStats *engine.Stats cfg *config.CGRConfig @@ -338,7 +337,7 @@ func startHistoryServer(chanDone chan struct{}) { // chanStartServer will report when server is up, useful for internal requests func startHistoryAgent(chanServerStarted chan struct{}) { if cfg.HistoryServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting - engine.Logger.Crit(fmt.Sprintf(" Connecting internally to HistoryServer")) + //engine.Logger.Crit(fmt.Sprintf(" Connecting internally to HistoryServer")) select { case <-time.After(1 * time.Minute): engine.Logger.Crit(fmt.Sprintf(" Timeout waiting for server to start.")) @@ -365,8 +364,8 @@ func startHistoryAgent(chanServerStarted chan struct{}) { return } -func startPubSubServer(chanDone chan struct{}) { - if pubSubServer = pubsub.NewPubSub(cfg.HttpSkipTlsVerify); err != nil { +func startPubSubServer(chanDone chan struct{}, accountDb engine.AccountingStorage) { + if pubSubServer = engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify); err != nil { engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) exitChan <- true return @@ -376,9 +375,8 @@ func startPubSubServer(chanDone chan struct{}) { } // chanStartServer will report when server is up, useful for internal requests -func startPubSubAgent(chanServerStarted chan struct{}) { +func startPubSubAgent(chanServerStarted chan struct{}, accountDb engine.AccountingStorage) { if cfg.PubSubServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting - engine.Logger.Crit(fmt.Sprintf(" Connecting internally to PubSubServer")) select { case <-time.After(1 * time.Minute): engine.Logger.Crit(fmt.Sprintf(" Timeout waiting for server to start.")) @@ -391,7 +389,7 @@ func startPubSubAgent(chanServerStarted chan struct{}) { delay := utils.Fib() for i := 0; i < 3; i++ { //ToDo: Make it globally configurable //engine.Logger.Crit(fmt.Sprintf(" Trying to connect, iteration: %d, time %s", i, time.Now())) - if pubSubServer = pubsub.NewPubSub(cfg.HttpSkipTlsVerify); err == nil { + if pubSubServer = engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify); err == nil { break //Connected so no need to reiterate } else if i == 2 && err != nil { engine.Logger.Crit(fmt.Sprintf(" Could not connect to the server, error: %s", err.Error())) @@ -625,12 +623,12 @@ func main() { if cfg.PubSubServerEnabled { pubsubServChan = make(chan struct{}) rpcWait = append(rpcWait, pubsubServChan) - go startPubSubServer(pubsubServChan) + go startPubSubServer(pubsubServChan, accountDb) } if cfg.PubSubAgentEnabled { engine.Logger.Info("Starting CGRateS PubSub Agent.") - go startPubSubAgent(pubsubServChan) + go startPubSubAgent(pubsubServChan, accountDb) } var cdrsChan chan struct{} diff --git a/cmd/cgr-engine/registration.go b/cmd/cgr-engine/registration.go index a929eaada..97837b68b 100644 --- a/cmd/cgr-engine/registration.go +++ b/cmd/cgr-engine/registration.go @@ -49,7 +49,9 @@ func generalSignalHandler() { sig := <-c engine.Logger.Info(fmt.Sprintf("Caught signal %v, shuting down cgr-engine\n", sig)) var dummyInt int - cdrStats.Stop(dummyInt, &dummyInt) + if cdrStats != nil { + cdrStats.Stop(dummyInt, &dummyInt) + } exitChan <- true } diff --git a/engine/calldesc.go b/engine/calldesc.go index 65ecfdd58..2e45d3c54 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -29,7 +29,6 @@ import ( "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/history" - "github.com/cgrates/cgrates/pubsub" "github.com/cgrates/cgrates/utils" ) @@ -68,7 +67,7 @@ var ( debitPeriod = 10 * time.Second globalRoundingDecimals = 10 historyScribe history.Scribe - pubSubServer pubsub.PublisherSubscriber + pubSubServer PublisherSubscriber //historyScribe, _ = history.NewMockScribe() ) @@ -105,7 +104,7 @@ func SetHistoryScribe(scribe history.Scribe) { historyScribe = scribe } -func SetPubSub(ps pubsub.PublisherSubscriber) { +func SetPubSub(ps PublisherSubscriber) { pubSubServer = ps } diff --git a/pubsub/pubsub.go b/engine/pubsub.go similarity index 69% rename from pubsub/pubsub.go rename to engine/pubsub.go index 28f890517..8bc8ecee8 100644 --- a/pubsub/pubsub.go +++ b/engine/pubsub.go @@ -1,7 +1,8 @@ -package pubsub +package engine import ( "errors" + "fmt" "sync" "time" @@ -32,14 +33,38 @@ type PubSub struct { ttlVerify bool pubFunc func(string, bool, interface{}) ([]byte, error) mux *sync.Mutex + accountDb AccountingStorage } -func NewPubSub(ttlVerify bool) *PubSub { - return &PubSub{ +func NewPubSub(accountDb AccountingStorage, ttlVerify bool) *PubSub { + ps := &PubSub{ ttlVerify: ttlVerify, subscribers: make(map[string]map[string]time.Time), pubFunc: utils.HttpJsonPost, mux: &sync.Mutex{}, + accountDb: accountDb, + } + // load subscribers + if subs, err := accountDb.GetPubSubSubscribers(); err == nil { + ps.subscribers = subs + } + return ps +} + +func (ps *PubSub) saveSubscribers(key string) { + if key != "" { + if _, found := ps.subscribers[key]; !found { + return + } + if err := accountingStorage.SetPubSubSubscribers(key, ps.subscribers[key]); err != nil { + Logger.Err(" Error saving subscribers: " + err.Error()) + } + } else { // save all + for key, valueMap := range ps.subscribers { + if err := accountingStorage.SetPubSubSubscribers(key, valueMap); err != nil { + Logger.Err(" Error saving subscribers: " + err.Error()) + } + } } } @@ -58,6 +83,7 @@ func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error { expTime = time.Now().Add(si.LifeSpan) } ps.subscribers[si.EventName][utils.InfieldJoin(si.Transport, si.Address)] = expTime + ps.saveSubscribers(si.EventName) *reply = utils.OK return nil } @@ -70,6 +96,7 @@ func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error { return errors.New(*reply) } delete(ps.subscribers[si.EventName], utils.InfieldJoin(si.Transport, si.Address)) + ps.saveSubscribers(si.EventName) *reply = utils.OK return nil } @@ -78,15 +105,17 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { ps.mux.Lock() defer ps.mux.Unlock() subs := ps.subscribers[pi.Event["EventName"]] - for transport_address, expTime := range subs { - split := utils.InfieldSplit(transport_address) + for transportAddress, expTime := range subs { + split := utils.InfieldSplit(transportAddress) if len(split) != 2 { + Logger.Warning(" Wrong transport;address pair: " + transportAddress) continue } transport := split[0] address := split[1] if !expTime.IsZero() && expTime.Before(time.Now()) { - delete(subs, transport_address) + delete(subs, transportAddress) + ps.saveSubscribers(pi.Event["EventName"]) continue // subscription expired, do not send event } switch transport { @@ -96,6 +125,9 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort if _, err := ps.pubFunc(address, ps.ttlVerify, pi.Event); err == nil { break // Success, no need to reinterate + } else if i == 4 { // Last iteration, syslog the warning + Logger.Warning(fmt.Sprintf(" Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), pi.Event["EventName"])) + break } time.Sleep(delay()) } diff --git a/pubsub/pubsub_test.go b/engine/pubsub_test.go similarity index 92% rename from pubsub/pubsub_test.go rename to engine/pubsub_test.go index 71a94fe64..6571604ee 100644 --- a/pubsub/pubsub_test.go +++ b/engine/pubsub_test.go @@ -1,4 +1,4 @@ -package pubsub +package engine import ( "testing" @@ -8,7 +8,7 @@ import ( ) func TestSubscribe(t *testing.T) { - ps := NewPubSub(false) + ps := NewPubSub(accountingStorage, false) var r string if err := ps.Subscribe(SubscribeInfo{ EventName: "test", @@ -24,7 +24,7 @@ func TestSubscribe(t *testing.T) { } func TestSubscribeNoTransport(t *testing.T) { - ps := NewPubSub(false) + ps := NewPubSub(accountingStorage, false) var r string if err := ps.Subscribe(SubscribeInfo{ EventName: "test", @@ -37,7 +37,7 @@ func TestSubscribeNoTransport(t *testing.T) { } func TestSubscribeNoExpire(t *testing.T) { - ps := NewPubSub(false) + ps := NewPubSub(accountingStorage, false) var r string if err := ps.Subscribe(SubscribeInfo{ EventName: "test", @@ -53,7 +53,7 @@ func TestSubscribeNoExpire(t *testing.T) { } func TestUnsubscribe(t *testing.T) { - ps := NewPubSub(false) + ps := NewPubSub(accountingStorage, false) var r string if err := ps.Subscribe(SubscribeInfo{ EventName: "test", @@ -76,7 +76,7 @@ func TestUnsubscribe(t *testing.T) { } func TestPublish(t *testing.T) { - ps := NewPubSub(true) + ps := NewPubSub(accountingStorage, true) ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { obj.(map[string]string)["called"] = url return nil, nil @@ -110,7 +110,7 @@ func TestPublish(t *testing.T) { } func TestPublishExpired(t *testing.T) { - ps := NewPubSub(true) + ps := NewPubSub(accountingStorage, true) ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { m := obj.(map[string]string) m["called"] = "yes" diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 894905304..631d56f03 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -23,6 +23,7 @@ import ( "encoding/gob" "encoding/json" "reflect" + "time" "github.com/cgrates/cgrates/utils" "github.com/ugorji/go/codec" @@ -79,6 +80,8 @@ type AccountingStorage interface { SetAccount(*Account) error GetCdrStatsQueue(string) (*StatsQueue, error) SetCdrStatsQueue(*StatsQueue) error + GetPubSubSubscribers() (map[string]map[string]time.Time, error) + SetPubSubSubscribers(string, map[string]time.Time) error } type CdrStorage interface { diff --git a/engine/storage_map.go b/engine/storage_map.go index e350c5ff6..01ec265c8 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -561,6 +561,24 @@ func (ms *MapStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { return } +func (ms *MapStorage) GetPubSubSubscribers() (result map[string]map[string]time.Time, err error) { + result = make(map[string]map[string]time.Time) + for key, value := range ms.dict { + if strings.HasPrefix(key, utils.PUBSUB_SUBSCRIBERS_PREFIX) { + subs := make(map[string]time.Time) + if err = ms.ms.Unmarshal(value, subs); err == nil { + result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs + } + } + } + return +} +func (ms *MapStorage) SetPubSubSubscribers(key string, subs map[string]time.Time) (err error) { + result, err := ms.ms.Marshal(subs) + ms.dict[utils.CDR_STATS_QUEUE_PREFIX+key] = result + return +} + func (ms *MapStorage) GetActionPlans(key string) (ats ActionPlans, err error) { if values, ok := ms.dict[utils.ACTION_TIMING_PREFIX+key]; ok { err = ms.ms.Unmarshal(values, &ats) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 41bb26b74..42a7164f0 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -23,6 +23,7 @@ import ( "compress/zlib" "errors" "fmt" + "log" "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/utils" @@ -689,6 +690,32 @@ func (rs *RedisStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { return } +func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]time.Time, err error) { + keys, err := rs.db.Keys(utils.PUBSUB_SUBSCRIBERS_PREFIX + "*") + if err != nil { + return nil, err + } + result = make(map[string]map[string]time.Time) + for _, key := range keys { + log.Print("KEY: ", key) + if values, err := rs.db.Get(key); err == nil { + subs := make(map[string]time.Time) + err = rs.ms.Unmarshal(values, subs) + result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs + } else { + return nil, utils.ErrNotFound + } + } + log.Print("XXX: ", result) + return +} + +func (rs *RedisStorage) SetPubSubSubscribers(key string, subs map[string]time.Time) (err error) { + result, err := rs.ms.Marshal(subs) + rs.db.Set(utils.PUBSUB_SUBSCRIBERS_PREFIX+key, result) + return +} + func (rs *RedisStorage) GetActionPlans(key string) (ats ActionPlans, err error) { var values []byte if values, err = rs.db.Get(utils.ACTION_TIMING_PREFIX + key); err == nil { diff --git a/utils/consts.go b/utils/consts.go index 5a11dc0c0..458b426e1 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -165,6 +165,7 @@ const ( LCR_PREFIX = "lcr_" DERIVEDCHARGERS_PREFIX = "dcs_" CDR_STATS_QUEUE_PREFIX = "csq_" + PUBSUB_SUBSCRIBERS_PREFIX = "pss_" CDR_STATS_PREFIX = "cst_" TEMP_DESTINATION_PREFIX = "tmp_" LOG_CALL_COST_PREFIX = "cco_" From 342415a6ad9e99e87193646eded4911a9b3bac17 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 1 Jul 2015 22:15:38 +0300 Subject: [PATCH 08/14] removed logging --- engine/storage_redis.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 42a7164f0..0270133de 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -23,7 +23,6 @@ import ( "compress/zlib" "errors" "fmt" - "log" "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/utils" @@ -697,7 +696,6 @@ func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]tim } result = make(map[string]map[string]time.Time) for _, key := range keys { - log.Print("KEY: ", key) if values, err := rs.db.Get(key); err == nil { subs := make(map[string]time.Time) err = rs.ms.Unmarshal(values, subs) @@ -706,7 +704,6 @@ func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]tim return nil, utils.ErrNotFound } } - log.Print("XXX: ", result) return } From 9c9465e1fc0ce2a13f40a1ad50c2d580ad9ddf9a Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 2 Jul 2015 09:16:45 +0300 Subject: [PATCH 09/14] removed pubsub save interval --- config/config.go | 6 ------ config/config_json_test.go | 3 +-- config/libconfig_json.go | 3 +-- data/conf/cgrates/cgrates.json | 1 - data/conf/samples/cgradmin/cgradmin.json | 1 - 5 files changed, 2 insertions(+), 12 deletions(-) diff --git a/config/config.go b/config/config.go index b7226fa61..e78aad64a 100644 --- a/config/config.go +++ b/config/config.go @@ -217,7 +217,6 @@ type CGRConfig struct { PubSubAgentEnabled bool // Starts PubSub as an agent: . PubSubServer string // Address where to reach the master pubsub server: PubSubServerEnabled bool // Starts PubSub as server: . - PubSubSaveInterval time.Duration // The timout duration between pubsub writes MailerServer string // The server to use when sending emails out MailerAuthUser string // Authenticate to email server using this user MailerAuthPass string // Authenticate to email server with this password @@ -709,11 +708,6 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { if jsnPubSubServCfg.Enabled != nil { self.PubSubServerEnabled = *jsnPubSubServCfg.Enabled } - if jsnPubSubServCfg.Save_interval != nil { - if self.PubSubSaveInterval, err = utils.ParseDurationWithSecs(*jsnPubSubServCfg.Save_interval); err != nil { - return err - } - } } if jsnMailerCfg != nil { diff --git a/config/config_json_test.go b/config/config_json_test.go index cde9a43d5..960fe8862 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -408,8 +408,7 @@ func TestDfHistAgentJsonCfg(t *testing.T) { func TestDfPubSubServJsonCfg(t *testing.T) { eCfg := &PubSubServJsonCfg{ - Enabled: utils.BoolPointer(false), - Save_interval: utils.StringPointer("1s"), + Enabled: utils.BoolPointer(false), } if cfg, err := dfCgrJsonCfg.PubSubServJsonCfg(); err != nil { t.Error(err) diff --git a/config/libconfig_json.go b/config/libconfig_json.go index cb1a37c80..f01c7dd77 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -219,8 +219,7 @@ type HistAgentJsonCfg struct { // PubSub server config section type PubSubServJsonCfg struct { - Enabled *bool - Save_interval *string + Enabled *bool } // PubSub agent config section diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 905f73855..6b1a4b031 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -224,7 +224,6 @@ //"pubsub_server": { // "enabled": false, // starts pubsub service: . -// "save_interval": "1s", // interval to save subscribers //}, diff --git a/data/conf/samples/cgradmin/cgradmin.json b/data/conf/samples/cgradmin/cgradmin.json index f8fcb0475..158e85047 100644 --- a/data/conf/samples/cgradmin/cgradmin.json +++ b/data/conf/samples/cgradmin/cgradmin.json @@ -20,7 +20,6 @@ "pubsub_server": { "enabled": true, // starts pubsub service: . - "save_interval": "1m", // interval to save subscribers }, From 54fa1476d946533eae31e2de3e090ab740020110 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 2 Jul 2015 09:59:26 +0300 Subject: [PATCH 10/14] added console commands and ShowSubscribers method --- console/publish.go | 64 +++++++++++++++++++++++++++++++++++++ console/show_subscribers.go | 64 +++++++++++++++++++++++++++++++++++++ console/subscribe.go | 64 +++++++++++++++++++++++++++++++++++++ console/unsubscribe.go | 64 +++++++++++++++++++++++++++++++++++++ engine/pubsub.go | 16 ++++++++-- 5 files changed, 269 insertions(+), 3 deletions(-) create mode 100644 console/publish.go create mode 100644 console/show_subscribers.go create mode 100644 console/subscribe.go create mode 100644 console/unsubscribe.go diff --git a/console/publish.go b/console/publish.go new file mode 100644 index 000000000..a407c8023 --- /dev/null +++ b/console/publish.go @@ -0,0 +1,64 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2012-2015 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package console + +import "github.com/cgrates/cgrates/engine" + +func init() { + c := &CmdPublish{ + name: "publish", + rpcMethod: "PubSubV1.Publish", + } + commands[c.Name()] = c + c.CommandExecuter = &CommandExecuter{c} +} + +type CmdPublish struct { + name string + rpcMethod string + rpcParams *engine.PublishInfo + *CommandExecuter +} + +func (self *CmdPublish) Name() string { + return self.name +} + +func (self *CmdPublish) RpcMethod() string { + return self.rpcMethod +} + +func (self *CmdPublish) RpcParams(ptr bool) interface{} { + if self.rpcParams == nil { + self.rpcParams = &engine.PublishInfo{} + } + if ptr { + return self.rpcParams + } + return *self.rpcParams +} + +func (self *CmdPublish) PostprocessRpcParams() error { + return nil +} + +func (self *CmdPublish) RpcResult() interface{} { + var s string + return &s +} diff --git a/console/show_subscribers.go b/console/show_subscribers.go new file mode 100644 index 000000000..b4591328d --- /dev/null +++ b/console/show_subscribers.go @@ -0,0 +1,64 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2012-2015 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package console + +import "time" + +func init() { + c := &CmdShowSubscribers{ + name: "show_subscribers", + rpcMethod: "PubSubV1.ShowSubscribers", + } + commands[c.Name()] = c + c.CommandExecuter = &CommandExecuter{c} +} + +type CmdShowSubscribers struct { + name string + rpcMethod string + rpcParams *StringWrapper + *CommandExecuter +} + +func (self *CmdShowSubscribers) Name() string { + return self.name +} + +func (self *CmdShowSubscribers) RpcMethod() string { + return self.rpcMethod +} + +func (self *CmdShowSubscribers) RpcParams(ptr bool) interface{} { + if self.rpcParams == nil { + self.rpcParams = &StringWrapper{} + } + if ptr { + return self.rpcParams + } + return *self.rpcParams +} + +func (self *CmdShowSubscribers) PostprocessRpcParams() error { + return nil +} + +func (self *CmdShowSubscribers) RpcResult() interface{} { + var s map[string]map[string]time.Time + return &s +} diff --git a/console/subscribe.go b/console/subscribe.go new file mode 100644 index 000000000..ee8c27bd7 --- /dev/null +++ b/console/subscribe.go @@ -0,0 +1,64 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2012-2015 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package console + +import "github.com/cgrates/cgrates/engine" + +func init() { + c := &CmdSubscribe{ + name: "subscribe", + rpcMethod: "PubSubV1.Subscribe", + } + commands[c.Name()] = c + c.CommandExecuter = &CommandExecuter{c} +} + +type CmdSubscribe struct { + name string + rpcMethod string + rpcParams *engine.SubscribeInfo + *CommandExecuter +} + +func (self *CmdSubscribe) Name() string { + return self.name +} + +func (self *CmdSubscribe) RpcMethod() string { + return self.rpcMethod +} + +func (self *CmdSubscribe) RpcParams(ptr bool) interface{} { + if self.rpcParams == nil { + self.rpcParams = &engine.SubscribeInfo{} + } + if ptr { + return self.rpcParams + } + return *self.rpcParams +} + +func (self *CmdSubscribe) PostprocessRpcParams() error { + return nil +} + +func (self *CmdSubscribe) RpcResult() interface{} { + var s string + return &s +} diff --git a/console/unsubscribe.go b/console/unsubscribe.go new file mode 100644 index 000000000..66f02d741 --- /dev/null +++ b/console/unsubscribe.go @@ -0,0 +1,64 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2012-2015 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package console + +import "github.com/cgrates/cgrates/engine" + +func init() { + c := &CmdUnsubscribe{ + name: "unsubscribe", + rpcMethod: "PubSubV1.Unsubscribe", + } + commands[c.Name()] = c + c.CommandExecuter = &CommandExecuter{c} +} + +type CmdUnsubscribe struct { + name string + rpcMethod string + rpcParams *engine.SubscribeInfo + *CommandExecuter +} + +func (self *CmdUnsubscribe) Name() string { + return self.name +} + +func (self *CmdUnsubscribe) RpcMethod() string { + return self.rpcMethod +} + +func (self *CmdUnsubscribe) RpcParams(ptr bool) interface{} { + if self.rpcParams == nil { + self.rpcParams = &engine.SubscribeInfo{} + } + if ptr { + return self.rpcParams + } + return *self.rpcParams +} + +func (self *CmdUnsubscribe) PostprocessRpcParams() error { + return nil +} + +func (self *CmdUnsubscribe) RpcResult() interface{} { + var s string + return &s +} diff --git a/engine/pubsub.go b/engine/pubsub.go index 8bc8ecee8..65b9b3cc7 100644 --- a/engine/pubsub.go +++ b/engine/pubsub.go @@ -26,6 +26,7 @@ type PublisherSubscriber interface { Subscribe(SubscribeInfo, *string) error Unsubscribe(SubscribeInfo, *string) error Publish(PublishInfo, *string) error + ShowSubscribers(string, *map[string]map[string]time.Time) error } type PubSub struct { @@ -138,6 +139,11 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { return nil } +func (ps *PubSub) ShowSubscribers(in string, out *map[string]map[string]time.Time) error { + *out = ps.subscribers + return nil +} + type ProxyPubSub struct { Client *rpcclient.RpcClient } @@ -151,11 +157,15 @@ func NewProxyPubSub(addr string, reconnects int) (*ProxyPubSub, error) { } func (ps *ProxyPubSub) Subscribe(si SubscribeInfo, reply *string) error { - return ps.Client.Call("PubSub.Subscribe", si, reply) + return ps.Client.Call("PubSubV1.Subscribe", si, reply) } func (ps *ProxyPubSub) Unsubscribe(si SubscribeInfo, reply *string) error { - return ps.Client.Call("PubSub.Unsubscribe", si, reply) + return ps.Client.Call("PubSubV1.Unsubscribe", si, reply) } func (ps *ProxyPubSub) Publish(pi PublishInfo, reply *string) error { - return ps.Client.Call("PubSub.Publish", pi, reply) + return ps.Client.Call("PubSubV1.Publish", pi, reply) +} + +func (ps *ProxyPubSub) ShowSubscribers(in string, reply *map[string]map[string]time.Time) error { + return ps.Client.Call("PubSubV1.ShowSubscribers", in, reply) } From 285936f02bdbc3afeb59b692ccd7674ac2ac3611 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 2 Jul 2015 10:15:42 +0300 Subject: [PATCH 11/14] added tests for pubusub save function --- engine/pubsub_test.go | 72 +++++++++++++++++++++++++++++++++++++++++++ engine/storage_map.go | 2 +- 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/engine/pubsub_test.go b/engine/pubsub_test.go index 6571604ee..d693414f0 100644 --- a/engine/pubsub_test.go +++ b/engine/pubsub_test.go @@ -23,6 +23,23 @@ func TestSubscribe(t *testing.T) { } } +func TestSubscribeSave(t *testing.T) { + ps := NewPubSub(accountingStorage, false) + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + subs, err := accountingStorage.GetPubSubSubscribers() + if err != nil || len(subs["test"]) != 1 { + t.Error("Error saving subscribers: ", err, subs) + } +} + func TestSubscribeNoTransport(t *testing.T) { ps := NewPubSub(accountingStorage, false) var r string @@ -75,6 +92,30 @@ func TestUnsubscribe(t *testing.T) { } } +func TestUnsubscribeSave(t *testing.T) { + ps := NewPubSub(accountingStorage, false) + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + if err := ps.Unsubscribe(SubscribeInfo{ + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + }, &r); err != nil { + t.Error("Error unsubscribing: ", err) + } + subs, err := accountingStorage.GetPubSubSubscribers() + if err != nil || len(subs["test"]) != 0 { + t.Error("Error saving subscribers: ", err, subs) + } +} + func TestPublish(t *testing.T) { ps := NewPubSub(accountingStorage, true) ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { @@ -134,3 +175,34 @@ func TestPublishExpired(t *testing.T) { t.Error("Error removing expired subscribers: ", ps.subscribers) } } + +func TestPublishExpiredSave(t *testing.T) { + ps := NewPubSub(accountingStorage, true) + ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { + m := obj.(map[string]string) + m["called"] = "yes" + return nil, nil + } + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: 1, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + subs, err := accountingStorage.GetPubSubSubscribers() + if err != nil || len(subs["test"]) != 1 { + t.Error("Error saving subscribers: ", err, subs) + } + if err := ps.Publish(PublishInfo{ + Event: map[string]string{"EventName": "test"}, + }, &r); err != nil { + t.Error("Error publishing: ", err) + } + subs, err = accountingStorage.GetPubSubSubscribers() + if err != nil || len(subs["test"]) != 0 { + t.Error("Error saving subscribers: ", err, subs) + } +} diff --git a/engine/storage_map.go b/engine/storage_map.go index 01ec265c8..f64d00d73 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -575,7 +575,7 @@ func (ms *MapStorage) GetPubSubSubscribers() (result map[string]map[string]time. } func (ms *MapStorage) SetPubSubSubscribers(key string, subs map[string]time.Time) (err error) { result, err := ms.ms.Marshal(subs) - ms.dict[utils.CDR_STATS_QUEUE_PREFIX+key] = result + ms.dict[utils.PUBSUB_SUBSCRIBERS_PREFIX+key] = result return } From 567b1e77dd8a26c8abf6d038cebec7693dec732a Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 1 Jul 2015 16:47:54 +0300 Subject: [PATCH 12/14] json rpc for cgr tester --- cmd/cgr-tester/cgr-tester.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go index 174d50c01..1c3999a3d 100644 --- a/cmd/cgr-tester/cgr-tester.go +++ b/cmd/cgr-tester/cgr-tester.go @@ -23,6 +23,7 @@ import ( "fmt" "log" "net/rpc" + "net/rpc/jsonrpc" "os" "runtime" "runtime/pprof" @@ -58,6 +59,7 @@ var ( tenant = flag.String("tenant", "cgrates.org", "The type of record to use in queries.") subject = flag.String("subject", "1001", "The rating subject to use in queries.") destination = flag.String("destination", "1002", "The destination to use in queries.") + json = flag.Bool("json", false, "Use JSON RPC") nilDuration = time.Duration(0) ) @@ -107,7 +109,14 @@ func durInternalRater(cd *engine.CallDescriptor) (time.Duration, error) { func durRemoteRater(cd *engine.CallDescriptor) (time.Duration, error) { result := engine.CallCost{} - client, err := rpc.Dial("tcp", *raterAddress) + var client *rpc.Client + var err error + if *json { + client, err = jsonrpc.Dial("tcp", *raterAddress) + } else { + client, err = rpc.Dial("tcp", *raterAddress) + } + if err != nil { return nilDuration, fmt.Errorf("Could not connect to engine: %s", err.Error()) } From 98d416b91c27f9c29281588c3a4a3e63e7248093 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 2 Jul 2015 10:21:38 +0300 Subject: [PATCH 13/14] removed pubsub dir from tests --- test.sh | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/test.sh b/test.sh index 7c605b910..48c9fa8d4 100755 --- a/test.sh +++ b/test.sh @@ -11,7 +11,6 @@ go test -i github.com/cgrates/cgrates/cdrc go test -i github.com/cgrates/cgrates/utils go test -i github.com/cgrates/cgrates/history go test -i github.com/cgrates/cgrates/cdre -go test -i github.com/cgrates/cgrates/pubsub go test github.com/cgrates/cgrates/apier/v1 v1=$? @@ -37,7 +36,5 @@ go test github.com/cgrates/cgrates/cache2go c2g=$? go test github.com/cgrates/cgrates/cdre cdre=$? -go test github.com/cgrates/cgrates/pubsub -ps=$? -exit $v1 && $v2 && $en && $gt && $sm && $cfg && $bl && $cr && $cdrc && $ut && $hs && $c2g && $cdre && $ps +exit $v1 && $v2 && $en && $gt && $sm && $cfg && $bl && $cr && $cdrc && $ut && $hs && $c2g && $cdre From 897c6fddf6ffd733c6cf15bde8e342e12bba841a Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 2 Jul 2015 11:49:17 +0300 Subject: [PATCH 14/14] added filters in subscriber data --- console/show_subscribers.go | 4 ++-- engine/pubsub.go | 31 ++++++++++++++++++++++--------- engine/pubsub_test.go | 4 ++-- engine/storage_interface.go | 5 ++--- engine/storage_map.go | 10 +++++----- engine/storage_redis.go | 10 +++++----- 6 files changed, 38 insertions(+), 26 deletions(-) diff --git a/console/show_subscribers.go b/console/show_subscribers.go index b4591328d..7ddae5af0 100644 --- a/console/show_subscribers.go +++ b/console/show_subscribers.go @@ -18,7 +18,7 @@ along with this program. If not, see package console -import "time" +import "github.com/cgrates/cgrates/engine" func init() { c := &CmdShowSubscribers{ @@ -59,6 +59,6 @@ func (self *CmdShowSubscribers) PostprocessRpcParams() error { } func (self *CmdShowSubscribers) RpcResult() interface{} { - var s map[string]map[string]time.Time + var s map[string]map[string]*engine.SubscriberData return &s } diff --git a/engine/pubsub.go b/engine/pubsub.go index 65b9b3cc7..6784f423d 100644 --- a/engine/pubsub.go +++ b/engine/pubsub.go @@ -26,11 +26,16 @@ type PublisherSubscriber interface { Subscribe(SubscribeInfo, *string) error Unsubscribe(SubscribeInfo, *string) error Publish(PublishInfo, *string) error - ShowSubscribers(string, *map[string]map[string]time.Time) error + ShowSubscribers(string, *map[string]map[string]*SubscriberData) error +} + +type SubscriberData struct { + ExpTime time.Time + Filters utils.RSRFields } type PubSub struct { - subscribers map[string]map[string]time.Time + subscribers map[string]map[string]*SubscriberData ttlVerify bool pubFunc func(string, bool, interface{}) ([]byte, error) mux *sync.Mutex @@ -40,7 +45,7 @@ type PubSub struct { func NewPubSub(accountDb AccountingStorage, ttlVerify bool) *PubSub { ps := &PubSub{ ttlVerify: ttlVerify, - subscribers: make(map[string]map[string]time.Time), + subscribers: make(map[string]map[string]*SubscriberData), pubFunc: utils.HttpJsonPost, mux: &sync.Mutex{}, accountDb: accountDb, @@ -77,13 +82,21 @@ func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error { return errors.New(*reply) } if ps.subscribers[si.EventName] == nil { - ps.subscribers[si.EventName] = make(map[string]time.Time) + ps.subscribers[si.EventName] = make(map[string]*SubscriberData) } var expTime time.Time if si.LifeSpan > 0 { expTime = time.Now().Add(si.LifeSpan) } - ps.subscribers[si.EventName][utils.InfieldJoin(si.Transport, si.Address)] = expTime + rsr, err := utils.ParseRSRFields(si.EventFilter, utils.INFIELD_SEP) + if err != nil { + *reply = err.Error() + return err + } + ps.subscribers[si.EventName][utils.InfieldJoin(si.Transport, si.Address)] = &SubscriberData{ + ExpTime: expTime, + Filters: rsr, + } ps.saveSubscribers(si.EventName) *reply = utils.OK return nil @@ -106,7 +119,7 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { ps.mux.Lock() defer ps.mux.Unlock() subs := ps.subscribers[pi.Event["EventName"]] - for transportAddress, expTime := range subs { + for transportAddress, subData := range subs { split := utils.InfieldSplit(transportAddress) if len(split) != 2 { Logger.Warning(" Wrong transport;address pair: " + transportAddress) @@ -114,7 +127,7 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { } transport := split[0] address := split[1] - if !expTime.IsZero() && expTime.Before(time.Now()) { + if !subData.ExpTime.IsZero() && subData.ExpTime.Before(time.Now()) { delete(subs, transportAddress) ps.saveSubscribers(pi.Event["EventName"]) continue // subscription expired, do not send event @@ -139,7 +152,7 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { return nil } -func (ps *PubSub) ShowSubscribers(in string, out *map[string]map[string]time.Time) error { +func (ps *PubSub) ShowSubscribers(in string, out *map[string]map[string]*SubscriberData) error { *out = ps.subscribers return nil } @@ -166,6 +179,6 @@ func (ps *ProxyPubSub) Publish(pi PublishInfo, reply *string) error { return ps.Client.Call("PubSubV1.Publish", pi, reply) } -func (ps *ProxyPubSub) ShowSubscribers(in string, reply *map[string]map[string]time.Time) error { +func (ps *ProxyPubSub) ShowSubscribers(in string, reply *map[string]map[string]*SubscriberData) error { return ps.Client.Call("PubSubV1.ShowSubscribers", in, reply) } diff --git a/engine/pubsub_test.go b/engine/pubsub_test.go index d693414f0..6e027a7b8 100644 --- a/engine/pubsub_test.go +++ b/engine/pubsub_test.go @@ -18,7 +18,7 @@ func TestSubscribe(t *testing.T) { }, &r); err != nil { t.Error("Error subscribing: ", err) } - if expTime, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || expTime.IsZero() { + if subData, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || subData.ExpTime.IsZero() { t.Error("Error adding subscriber: ", ps.subscribers) } } @@ -64,7 +64,7 @@ func TestSubscribeNoExpire(t *testing.T) { }, &r); err != nil { t.Error("Error subscribing: ", err) } - if expTime, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || !expTime.IsZero() { + if subData, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || !subData.ExpTime.IsZero() { t.Error("Error adding no expire subscriber: ", ps.subscribers) } } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 631d56f03..faa8fc560 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -23,7 +23,6 @@ import ( "encoding/gob" "encoding/json" "reflect" - "time" "github.com/cgrates/cgrates/utils" "github.com/ugorji/go/codec" @@ -80,8 +79,8 @@ type AccountingStorage interface { SetAccount(*Account) error GetCdrStatsQueue(string) (*StatsQueue, error) SetCdrStatsQueue(*StatsQueue) error - GetPubSubSubscribers() (map[string]map[string]time.Time, error) - SetPubSubSubscribers(string, map[string]time.Time) error + GetPubSubSubscribers() (map[string]map[string]*SubscriberData, error) + SetPubSubSubscribers(string, map[string]*SubscriberData) error } type CdrStorage interface { diff --git a/engine/storage_map.go b/engine/storage_map.go index f64d00d73..b729bd102 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -561,19 +561,19 @@ func (ms *MapStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { return } -func (ms *MapStorage) GetPubSubSubscribers() (result map[string]map[string]time.Time, err error) { - result = make(map[string]map[string]time.Time) +func (ms *MapStorage) GetPubSubSubscribers() (result map[string]map[string]*SubscriberData, err error) { + result = make(map[string]map[string]*SubscriberData) for key, value := range ms.dict { if strings.HasPrefix(key, utils.PUBSUB_SUBSCRIBERS_PREFIX) { - subs := make(map[string]time.Time) - if err = ms.ms.Unmarshal(value, subs); err == nil { + subs := make(map[string]*SubscriberData) + if err = ms.ms.Unmarshal(value, &subs); err == nil { result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs } } } return } -func (ms *MapStorage) SetPubSubSubscribers(key string, subs map[string]time.Time) (err error) { +func (ms *MapStorage) SetPubSubSubscribers(key string, subs map[string]*SubscriberData) (err error) { result, err := ms.ms.Marshal(subs) ms.dict[utils.PUBSUB_SUBSCRIBERS_PREFIX+key] = result return diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 0270133de..c571d7622 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -689,16 +689,16 @@ func (rs *RedisStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { return } -func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]time.Time, err error) { +func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]*SubscriberData, err error) { keys, err := rs.db.Keys(utils.PUBSUB_SUBSCRIBERS_PREFIX + "*") if err != nil { return nil, err } - result = make(map[string]map[string]time.Time) + result = make(map[string]map[string]*SubscriberData) for _, key := range keys { if values, err := rs.db.Get(key); err == nil { - subs := make(map[string]time.Time) - err = rs.ms.Unmarshal(values, subs) + subs := make(map[string]*SubscriberData) + err = rs.ms.Unmarshal(values, &subs) result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs } else { return nil, utils.ErrNotFound @@ -707,7 +707,7 @@ func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]tim return } -func (rs *RedisStorage) SetPubSubSubscribers(key string, subs map[string]time.Time) (err error) { +func (rs *RedisStorage) SetPubSubSubscribers(key string, subs map[string]*SubscriberData) (err error) { result, err := rs.ms.Marshal(subs) rs.db.Set(utils.PUBSUB_SUBSCRIBERS_PREFIX+key, result) return