diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 1b57334df..ce4b05886 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -1,7 +1,9 @@ package pubsub import ( + "errors" "fmt" + "sync" "time" "github.com/cgrates/cgrates/config" @@ -11,17 +13,18 @@ import ( ) type SubscribeInfo struct { - EventType string - PostUrl string - LiveDuration time.Duration + EventName string + EventFilter string + Transport string + Address string + LifeSpan time.Duration } type PublishInfo struct { - EventType string - Event map[string]string + Event map[string]string } -type PublishSubscriber interface { +type PublisherSubscriber interface { Subscribe(SubscribeInfo, *string) error Unsubscribe(SubscribeInfo, *string) error Publish(PublishInfo, *string) error @@ -31,6 +34,7 @@ type PubSub struct { subscribers map[string]map[string]time.Time conf *config.CGRConfig pubFunc func(string, bool, interface{}) ([]byte, error) + mux *sync.Mutex } func NewPubSub(conf *config.CGRConfig) *PubSub { @@ -38,48 +42,72 @@ func NewPubSub(conf *config.CGRConfig) *PubSub { conf: conf, subscribers: make(map[string]map[string]time.Time), pubFunc: utils.HttpJsonPost, + mux: &sync.Mutex{}, } } func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error { - if ps.subscribers[si.EventType] == nil { - ps.subscribers[si.EventType] = make(map[string]time.Time) + ps.mux.Lock() + defer ps.mux.Unlock() + if si.Transport != utils.META_HTTP_POST { + *reply = "Unsupported transport type" + return errors.New(*reply) + } + if ps.subscribers[si.EventName] == nil { + ps.subscribers[si.EventName] = make(map[string]time.Time) } var expTime time.Time - if si.LiveDuration > 0 { - expTime = time.Now().Add(si.LiveDuration) + if si.LifeSpan > 0 { + expTime = time.Now().Add(si.LifeSpan) } - ps.subscribers[si.EventType][si.PostUrl] = expTime + ps.subscribers[si.EventName][utils.InfieldJoin(si.Transport, si.Address)] = expTime *reply = utils.OK return nil } func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error { - delete(ps.subscribers[si.EventType], si.PostUrl) + ps.mux.Lock() + defer ps.mux.Unlock() + if si.Transport != utils.META_HTTP_POST { + *reply = "Unsupported transport type" + return errors.New(*reply) + } + delete(ps.subscribers[si.EventName], utils.InfieldJoin(si.Transport, si.Address)) *reply = utils.OK return nil } func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { - subs := ps.subscribers[pi.EventType] - for postURL, expTime := range subs { + ps.mux.Lock() + defer ps.mux.Unlock() + subs := ps.subscribers[pi.Event["EventName"]] + for transport_address, expTime := range subs { + split := utils.InfieldSplit(transport_address) + if len(split) != 2 { + engine.Logger.Warning(" Wrong transport;address pair: " + transport_address) + continue + } + transport := split[0] + address := split[1] if !expTime.IsZero() && expTime.Before(time.Now()) { - delete(subs, postURL) + delete(subs, transport_address) continue // subscription expired, do not send event } - url := postURL - go func() { - delay := utils.Fib() - for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort - if _, err := ps.pubFunc(url, ps.conf.HttpSkipTlsVerify, pi.Event); err == nil { - break // Success, no need to reinterate - } else if i == 4 { // Last iteration, syslog the warning - engine.Logger.Warning(fmt.Sprintf(" WARNING: Failed calling url: [%s], error: [%s], event type: %s", url, err.Error(), pi.EventType)) - break + switch transport { + case utils.META_HTTP_POST: + go func() { + delay := utils.Fib() + for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort + if _, err := ps.pubFunc(address, ps.conf.HttpSkipTlsVerify, pi.Event); err == nil { + break // Success, no need to reinterate + } else if i == 4 { // Last iteration, syslog the warning + engine.Logger.Warning(fmt.Sprintf(" Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), pi.Event["EventName"])) + break + } + time.Sleep(delay()) } - time.Sleep(delay()) - } - }() + }() + } } *reply = utils.OK return nil @@ -97,6 +125,12 @@ func NewProxyPubSub(addr string, reconnects int) (*ProxyPubSub, error) { return &ProxyPubSub{Client: client}, nil } -func (ps *ProxyPubSub) Subscribe(sqID string, values *map[string]float64) error { - return ps.Client.Call("PubSub.Subscribe", sqID, values) +func (ps *ProxyPubSub) Subscribe(si SubscribeInfo, reply *string) error { + return ps.Client.Call("PubSub.Subscribe", si, reply) +} +func (ps *ProxyPubSub) Unsubscribe(si SubscribeInfo, reply *string) error { + return ps.Client.Call("PubSub.Unsubscribe", si, reply) +} +func (ps *ProxyPubSub) Publish(pi PublishInfo, reply *string) error { + return ps.Client.Call("PubSub.Publish", pi, reply) } diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 681a2d105..74c964dba 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -5,34 +5,50 @@ import ( "time" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" ) func TestSubscribe(t *testing.T) { ps := NewPubSub(nil) var r string if err := ps.Subscribe(SubscribeInfo{ - EventType: "test", - PostUrl: "url", - LiveDuration: time.Second, + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, }, &r); err != nil { t.Error("Error subscribing: ", err) } - if expTime, exists := ps.subscribers["test"]["url"]; !exists || expTime.IsZero() { + if expTime, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || expTime.IsZero() { t.Error("Error adding subscriber: ", ps.subscribers) } } +func TestSubscribeNoTransport(t *testing.T) { + ps := NewPubSub(nil) + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventName: "test", + Transport: "test", + Address: "url", + LifeSpan: time.Second, + }, &r); err == nil { + t.Error("Error subscribing error: ", err) + } +} + func TestSubscribeNoExpire(t *testing.T) { ps := NewPubSub(nil) var r string if err := ps.Subscribe(SubscribeInfo{ - EventType: "test", - PostUrl: "url", - LiveDuration: 0, + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: 0, }, &r); err != nil { t.Error("Error subscribing: ", err) } - if expTime, exists := ps.subscribers["test"]["url"]; !exists || !expTime.IsZero() { + if expTime, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || !expTime.IsZero() { t.Error("Error adding no expire subscriber: ", ps.subscribers) } } @@ -41,15 +57,17 @@ func TestUnsubscribe(t *testing.T) { ps := NewPubSub(nil) var r string if err := ps.Subscribe(SubscribeInfo{ - EventType: "test", - PostUrl: "url", - LiveDuration: time.Second, + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, }, &r); err != nil { t.Error("Error subscribing: ", err) } if err := ps.Unsubscribe(SubscribeInfo{ - EventType: "test", - PostUrl: "url", + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", }, &r); err != nil { t.Error("Error unsubscribing: ", err) } @@ -61,32 +79,33 @@ func TestUnsubscribe(t *testing.T) { func TestPublish(t *testing.T) { ps := NewPubSub(&config.CGRConfig{HttpSkipTlsVerify: true}) ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { - obj.(map[string]string)["called"] = "yes" + obj.(map[string]string)["called"] = url return nil, nil } var r string if err := ps.Subscribe(SubscribeInfo{ - EventType: "test", - PostUrl: "url", - LiveDuration: time.Second, + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, }, &r); err != nil { t.Error("Error subscribing: ", err) } m := make(map[string]string) + m["EventName"] = "test" if err := ps.Publish(PublishInfo{ - EventType: "test", - Event: m, + Event: m, }, &r); err != nil { t.Error("Error publishing: ", err) } for i := 0; i < 1000; i++ { // wait for the theread to populate map - if len(m) == 0 { + if len(m) == 1 { time.Sleep(time.Microsecond) } else { break } } - if r, exists := m["called"]; !exists || r != "yes" { + if r, exists := m["called"]; !exists || r != "url" { t.Error("Error calling publish function: ", m) } } @@ -100,15 +119,15 @@ func TestPublishExpired(t *testing.T) { } var r string if err := ps.Subscribe(SubscribeInfo{ - EventType: "test", - PostUrl: "url", - LiveDuration: 1, + EventName: "test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: 1, }, &r); err != nil { t.Error("Error subscribing: ", err) } if err := ps.Publish(PublishInfo{ - EventType: "test", - Event: nil, + Event: map[string]string{"EventName": "test"}, }, &r); err != nil { t.Error("Error publishing: ", err) } diff --git a/utils/coreutils.go b/utils/coreutils.go index de3c36194..1788aeab4 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -274,6 +274,14 @@ func AccountAliasKey(tenant, account string) string { return ConcatenatedKey(tenant, account) } +func InfieldJoin(vals ...string) string { + return strings.Join(vals, INFIELD_SEP) +} + +func InfieldSplit(val string) []string { + return strings.Split(val, INFIELD_SEP) +} + func HttpJsonPost(url string, skipTlsVerify bool, content interface{}) ([]byte, error) { body, err := json.Marshal(content) if err != nil {