first pubsub tests

This commit is contained in:
Radu Ioan Fericean
2015-06-30 22:54:06 +03:00
parent e99b3ff16c
commit 173b025d8f
3 changed files with 139 additions and 10 deletions

View File

@@ -4,6 +4,8 @@ import (
"fmt"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
@@ -11,7 +13,7 @@ import (
type SubscribeInfo struct {
EventType string
PostUrl string
LiveDuration time.Duartion
LiveDuration time.Duration
}
type PublishInfo struct {
@@ -27,13 +29,15 @@ type PublishSubscriber interface {
type PubSub struct {
subscribers map[string]map[string]time.Time
conf *CGRConfig
conf *config.CGRConfig
pubFunc func(string, bool, interface{}) ([]byte, error)
}
func NewPubSub(conf *CGRConfig) *PubSub {
func NewPubSub(conf *config.CGRConfig) *PubSub {
return &PubSub{
conf: conf,
subscribers: make(map[string]map[string]time.Time),
pubFunc: utils.HttpJsonPost,
}
}
@@ -41,7 +45,11 @@ func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error {
if ps.subscribers[si.EventType] == nil {
ps.subscribers[si.EventType] = make(map[string]time.Time)
}
ps.subscribers[si.EventType][si.PostUrl] = time.Now().Add(si.LiveDuration)
var expTime time.Time
if si.LiveDuration > 0 {
expTime = time.Now().Add(si.LiveDuration)
}
ps.subscribers[si.EventType][si.PostUrl] = expTime
*reply = utils.OK
return nil
}
@@ -52,10 +60,10 @@ func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error {
return nil
}
func (ps *PubSub) Publish(pi PublishInfo, replay *string) error {
func (ps *PubSub) Publish(pi PublishInfo, reply *string) error {
subs := ps.subscribers[pi.EventType]
for postURL, expTime := range subs {
if expTime.After(time.Now) {
if !expTime.IsZero() && expTime.Before(time.Now()) {
delete(subs, postURL)
continue // subscription expired, do not send event
}
@@ -63,16 +71,17 @@ func (ps *PubSub) Publish(pi PublishInfo, replay *string) error {
go func() {
delay := utils.Fib()
for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort
if _, err = utils.HttpJsonPost(url, ps.cfg.HttpSkipTlsVerify, pi.Event); err == nil {
if _, err := ps.pubFunc(url, ps.conf.HttpSkipTlsVerify, pi.Event); err == nil {
break // Success, no need to reinterate
} else if i == 4 { // Last iteration, syslog the warning
Logger.Warning(fmt.Sprintf("<PubSub> WARNING: Failed calling url: [%s], error: [%s], event type: %s", url, err.Error(), pi.EventType))
engine.Logger.Warning(fmt.Sprintf("<PubSub> WARNING: Failed calling url: [%s], error: [%s], event type: %s", url, err.Error(), pi.EventType))
break
}
time.Sleep(delay())
}
}()
}
*reply = utils.OK
return nil
}

118
pubsub/pubsub_test.go Normal file
View File

@@ -0,0 +1,118 @@
package pubsub
import (
"testing"
"time"
"github.com/cgrates/cgrates/config"
)
func TestSubscribe(t *testing.T) {
ps := NewPubSub(nil)
var r string
if err := ps.Subscribe(SubscribeInfo{
EventType: "test",
PostUrl: "url",
LiveDuration: time.Second,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
if expTime, exists := ps.subscribers["test"]["url"]; !exists || expTime.IsZero() {
t.Error("Error adding subscriber: ", ps.subscribers)
}
}
func TestSubscribeNoExpire(t *testing.T) {
ps := NewPubSub(nil)
var r string
if err := ps.Subscribe(SubscribeInfo{
EventType: "test",
PostUrl: "url",
LiveDuration: 0,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
if expTime, exists := ps.subscribers["test"]["url"]; !exists || !expTime.IsZero() {
t.Error("Error adding no expire subscriber: ", ps.subscribers)
}
}
func TestUnsubscribe(t *testing.T) {
ps := NewPubSub(nil)
var r string
if err := ps.Subscribe(SubscribeInfo{
EventType: "test",
PostUrl: "url",
LiveDuration: time.Second,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
if err := ps.Unsubscribe(SubscribeInfo{
EventType: "test",
PostUrl: "url",
}, &r); err != nil {
t.Error("Error unsubscribing: ", err)
}
if _, exists := ps.subscribers["test"]["url"]; exists {
t.Error("Error adding subscriber: ", ps.subscribers)
}
}
func TestPublish(t *testing.T) {
ps := NewPubSub(&config.CGRConfig{HttpSkipTlsVerify: true})
ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) {
obj.(map[string]string)["called"] = "yes"
return nil, nil
}
var r string
if err := ps.Subscribe(SubscribeInfo{
EventType: "test",
PostUrl: "url",
LiveDuration: time.Second,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
m := make(map[string]string)
if err := ps.Publish(PublishInfo{
EventType: "test",
Event: m,
}, &r); err != nil {
t.Error("Error publishing: ", err)
}
for i := 0; i < 1000; i++ { // wait for the theread to populate map
if len(m) == 0 {
time.Sleep(time.Microsecond)
} else {
break
}
}
if r, exists := m["called"]; !exists || r != "yes" {
t.Error("Error calling publish function: ", m)
}
}
func TestPublishExpired(t *testing.T) {
ps := NewPubSub(&config.CGRConfig{HttpSkipTlsVerify: true})
ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) {
m := obj.(map[string]string)
m["called"] = "yes"
return nil, nil
}
var r string
if err := ps.Subscribe(SubscribeInfo{
EventType: "test",
PostUrl: "url",
LiveDuration: 1,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
if err := ps.Publish(PublishInfo{
EventType: "test",
Event: nil,
}, &r); err != nil {
t.Error("Error publishing: ", err)
}
if len(ps.subscribers["test"]) != 0 {
t.Error("Error removing expired subscribers: ", ps.subscribers)
}
}

View File

@@ -11,6 +11,7 @@ go test -i github.com/cgrates/cgrates/cdrc
go test -i github.com/cgrates/cgrates/utils
go test -i github.com/cgrates/cgrates/history
go test -i github.com/cgrates/cgrates/cdre
go test -i github.com/cgrates/cgrates/pubsub
go test github.com/cgrates/cgrates/apier/v1
v1=$?
@@ -36,6 +37,7 @@ go test github.com/cgrates/cgrates/cache2go
c2g=$?
go test github.com/cgrates/cgrates/cdre
cdre=$?
go test github.com/cgrates/cgrates/pubsub
ps=$?
exit $v1 && $v2 && $en && $gt && $sm && $cfg && $bl && $cr && $cdrc && $ut && $hs && $c2g && $cdre
exit $v1 && $v2 && $en && $gt && $sm && $cfg && $bl && $cr && $cdrc && $ut && $hs && $c2g && $cdre && $ps