added transport and sync locks

This commit is contained in:
Radu Ioan Fericean
2015-07-01 12:03:15 +03:00
parent 173b025d8f
commit 66ac4b194c
3 changed files with 116 additions and 55 deletions

View File

@@ -1,7 +1,9 @@
package pubsub
import (
"errors"
"fmt"
"sync"
"time"
"github.com/cgrates/cgrates/config"
@@ -11,17 +13,18 @@ import (
)
type SubscribeInfo struct {
EventType string
PostUrl string
LiveDuration time.Duration
EventName string
EventFilter string
Transport string
Address string
LifeSpan time.Duration
}
type PublishInfo struct {
EventType string
Event map[string]string
Event map[string]string
}
type PublishSubscriber interface {
type PublisherSubscriber interface {
Subscribe(SubscribeInfo, *string) error
Unsubscribe(SubscribeInfo, *string) error
Publish(PublishInfo, *string) error
@@ -31,6 +34,7 @@ type PubSub struct {
subscribers map[string]map[string]time.Time
conf *config.CGRConfig
pubFunc func(string, bool, interface{}) ([]byte, error)
mux *sync.Mutex
}
func NewPubSub(conf *config.CGRConfig) *PubSub {
@@ -38,48 +42,72 @@ func NewPubSub(conf *config.CGRConfig) *PubSub {
conf: conf,
subscribers: make(map[string]map[string]time.Time),
pubFunc: utils.HttpJsonPost,
mux: &sync.Mutex{},
}
}
func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error {
if ps.subscribers[si.EventType] == nil {
ps.subscribers[si.EventType] = make(map[string]time.Time)
ps.mux.Lock()
defer ps.mux.Unlock()
if si.Transport != utils.META_HTTP_POST {
*reply = "Unsupported transport type"
return errors.New(*reply)
}
if ps.subscribers[si.EventName] == nil {
ps.subscribers[si.EventName] = make(map[string]time.Time)
}
var expTime time.Time
if si.LiveDuration > 0 {
expTime = time.Now().Add(si.LiveDuration)
if si.LifeSpan > 0 {
expTime = time.Now().Add(si.LifeSpan)
}
ps.subscribers[si.EventType][si.PostUrl] = expTime
ps.subscribers[si.EventName][utils.InfieldJoin(si.Transport, si.Address)] = expTime
*reply = utils.OK
return nil
}
func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error {
delete(ps.subscribers[si.EventType], si.PostUrl)
ps.mux.Lock()
defer ps.mux.Unlock()
if si.Transport != utils.META_HTTP_POST {
*reply = "Unsupported transport type"
return errors.New(*reply)
}
delete(ps.subscribers[si.EventName], utils.InfieldJoin(si.Transport, si.Address))
*reply = utils.OK
return nil
}
func (ps *PubSub) Publish(pi PublishInfo, reply *string) error {
subs := ps.subscribers[pi.EventType]
for postURL, expTime := range subs {
ps.mux.Lock()
defer ps.mux.Unlock()
subs := ps.subscribers[pi.Event["EventName"]]
for transport_address, expTime := range subs {
split := utils.InfieldSplit(transport_address)
if len(split) != 2 {
engine.Logger.Warning("<PubSub> Wrong transport;address pair: " + transport_address)
continue
}
transport := split[0]
address := split[1]
if !expTime.IsZero() && expTime.Before(time.Now()) {
delete(subs, postURL)
delete(subs, transport_address)
continue // subscription expired, do not send event
}
url := postURL
go func() {
delay := utils.Fib()
for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort
if _, err := ps.pubFunc(url, ps.conf.HttpSkipTlsVerify, pi.Event); err == nil {
break // Success, no need to reinterate
} else if i == 4 { // Last iteration, syslog the warning
engine.Logger.Warning(fmt.Sprintf("<PubSub> WARNING: Failed calling url: [%s], error: [%s], event type: %s", url, err.Error(), pi.EventType))
break
switch transport {
case utils.META_HTTP_POST:
go func() {
delay := utils.Fib()
for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort
if _, err := ps.pubFunc(address, ps.conf.HttpSkipTlsVerify, pi.Event); err == nil {
break // Success, no need to reinterate
} else if i == 4 { // Last iteration, syslog the warning
engine.Logger.Warning(fmt.Sprintf("<PubSub> Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), pi.Event["EventName"]))
break
}
time.Sleep(delay())
}
time.Sleep(delay())
}
}()
}()
}
}
*reply = utils.OK
return nil
@@ -97,6 +125,12 @@ func NewProxyPubSub(addr string, reconnects int) (*ProxyPubSub, error) {
return &ProxyPubSub{Client: client}, nil
}
func (ps *ProxyPubSub) Subscribe(sqID string, values *map[string]float64) error {
return ps.Client.Call("PubSub.Subscribe", sqID, values)
func (ps *ProxyPubSub) Subscribe(si SubscribeInfo, reply *string) error {
return ps.Client.Call("PubSub.Subscribe", si, reply)
}
func (ps *ProxyPubSub) Unsubscribe(si SubscribeInfo, reply *string) error {
return ps.Client.Call("PubSub.Unsubscribe", si, reply)
}
func (ps *ProxyPubSub) Publish(pi PublishInfo, reply *string) error {
return ps.Client.Call("PubSub.Publish", pi, reply)
}

View File

@@ -5,34 +5,50 @@ import (
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
func TestSubscribe(t *testing.T) {
ps := NewPubSub(nil)
var r string
if err := ps.Subscribe(SubscribeInfo{
EventType: "test",
PostUrl: "url",
LiveDuration: time.Second,
EventName: "test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: time.Second,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
if expTime, exists := ps.subscribers["test"]["url"]; !exists || expTime.IsZero() {
if expTime, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || expTime.IsZero() {
t.Error("Error adding subscriber: ", ps.subscribers)
}
}
func TestSubscribeNoTransport(t *testing.T) {
ps := NewPubSub(nil)
var r string
if err := ps.Subscribe(SubscribeInfo{
EventName: "test",
Transport: "test",
Address: "url",
LifeSpan: time.Second,
}, &r); err == nil {
t.Error("Error subscribing error: ", err)
}
}
func TestSubscribeNoExpire(t *testing.T) {
ps := NewPubSub(nil)
var r string
if err := ps.Subscribe(SubscribeInfo{
EventType: "test",
PostUrl: "url",
LiveDuration: 0,
EventName: "test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: 0,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
if expTime, exists := ps.subscribers["test"]["url"]; !exists || !expTime.IsZero() {
if expTime, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || !expTime.IsZero() {
t.Error("Error adding no expire subscriber: ", ps.subscribers)
}
}
@@ -41,15 +57,17 @@ func TestUnsubscribe(t *testing.T) {
ps := NewPubSub(nil)
var r string
if err := ps.Subscribe(SubscribeInfo{
EventType: "test",
PostUrl: "url",
LiveDuration: time.Second,
EventName: "test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: time.Second,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
if err := ps.Unsubscribe(SubscribeInfo{
EventType: "test",
PostUrl: "url",
EventName: "test",
Transport: utils.META_HTTP_POST,
Address: "url",
}, &r); err != nil {
t.Error("Error unsubscribing: ", err)
}
@@ -61,32 +79,33 @@ func TestUnsubscribe(t *testing.T) {
func TestPublish(t *testing.T) {
ps := NewPubSub(&config.CGRConfig{HttpSkipTlsVerify: true})
ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) {
obj.(map[string]string)["called"] = "yes"
obj.(map[string]string)["called"] = url
return nil, nil
}
var r string
if err := ps.Subscribe(SubscribeInfo{
EventType: "test",
PostUrl: "url",
LiveDuration: time.Second,
EventName: "test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: time.Second,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
m := make(map[string]string)
m["EventName"] = "test"
if err := ps.Publish(PublishInfo{
EventType: "test",
Event: m,
Event: m,
}, &r); err != nil {
t.Error("Error publishing: ", err)
}
for i := 0; i < 1000; i++ { // wait for the theread to populate map
if len(m) == 0 {
if len(m) == 1 {
time.Sleep(time.Microsecond)
} else {
break
}
}
if r, exists := m["called"]; !exists || r != "yes" {
if r, exists := m["called"]; !exists || r != "url" {
t.Error("Error calling publish function: ", m)
}
}
@@ -100,15 +119,15 @@ func TestPublishExpired(t *testing.T) {
}
var r string
if err := ps.Subscribe(SubscribeInfo{
EventType: "test",
PostUrl: "url",
LiveDuration: 1,
EventName: "test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: 1,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
if err := ps.Publish(PublishInfo{
EventType: "test",
Event: nil,
Event: map[string]string{"EventName": "test"},
}, &r); err != nil {
t.Error("Error publishing: ", err)
}

View File

@@ -274,6 +274,14 @@ func AccountAliasKey(tenant, account string) string {
return ConcatenatedKey(tenant, account)
}
func InfieldJoin(vals ...string) string {
return strings.Join(vals, INFIELD_SEP)
}
func InfieldSplit(val string) []string {
return strings.Split(val, INFIELD_SEP)
}
func HttpJsonPost(url string, skipTlsVerify bool, content interface{}) ([]byte, error) {
body, err := json.Marshal(content)
if err != nil {