diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index f0f76dd07..1b57334df 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -4,6 +4,8 @@ import ( "fmt" "time" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) @@ -11,7 +13,7 @@ import ( type SubscribeInfo struct { EventType string PostUrl string - LiveDuration time.Duartion + LiveDuration time.Duration } type PublishInfo struct { @@ -27,13 +29,15 @@ type PublishSubscriber interface { type PubSub struct { subscribers map[string]map[string]time.Time - conf *CGRConfig + conf *config.CGRConfig + pubFunc func(string, bool, interface{}) ([]byte, error) } -func NewPubSub(conf *CGRConfig) *PubSub { +func NewPubSub(conf *config.CGRConfig) *PubSub { return &PubSub{ conf: conf, subscribers: make(map[string]map[string]time.Time), + pubFunc: utils.HttpJsonPost, } } @@ -41,7 +45,11 @@ func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error { if ps.subscribers[si.EventType] == nil { ps.subscribers[si.EventType] = make(map[string]time.Time) } - ps.subscribers[si.EventType][si.PostUrl] = time.Now().Add(si.LiveDuration) + var expTime time.Time + if si.LiveDuration > 0 { + expTime = time.Now().Add(si.LiveDuration) + } + ps.subscribers[si.EventType][si.PostUrl] = expTime *reply = utils.OK return nil } @@ -52,10 +60,10 @@ func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error { return nil } -func (ps *PubSub) Publish(pi PublishInfo, replay *string) error { +func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { subs := ps.subscribers[pi.EventType] for postURL, expTime := range subs { - if expTime.After(time.Now) { + if !expTime.IsZero() && expTime.Before(time.Now()) { delete(subs, postURL) continue // subscription expired, do not send event } @@ -63,16 +71,17 @@ func (ps *PubSub) Publish(pi PublishInfo, replay *string) error { go func() { delay := utils.Fib() for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort - if _, err = utils.HttpJsonPost(url, ps.cfg.HttpSkipTlsVerify, pi.Event); err == nil { + if _, err := ps.pubFunc(url, ps.conf.HttpSkipTlsVerify, pi.Event); err == nil { break // Success, no need to reinterate } else if i == 4 { // Last iteration, syslog the warning - Logger.Warning(fmt.Sprintf(" WARNING: Failed calling url: [%s], error: [%s], event type: %s", url, err.Error(), pi.EventType)) + engine.Logger.Warning(fmt.Sprintf(" WARNING: Failed calling url: [%s], error: [%s], event type: %s", url, err.Error(), pi.EventType)) break } time.Sleep(delay()) } }() } + *reply = utils.OK return nil } diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go new file mode 100644 index 000000000..681a2d105 --- /dev/null +++ b/pubsub/pubsub_test.go @@ -0,0 +1,118 @@ +package pubsub + +import ( + "testing" + "time" + + "github.com/cgrates/cgrates/config" +) + +func TestSubscribe(t *testing.T) { + ps := NewPubSub(nil) + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventType: "test", + PostUrl: "url", + LiveDuration: time.Second, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + if expTime, exists := ps.subscribers["test"]["url"]; !exists || expTime.IsZero() { + t.Error("Error adding subscriber: ", ps.subscribers) + } +} + +func TestSubscribeNoExpire(t *testing.T) { + ps := NewPubSub(nil) + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventType: "test", + PostUrl: "url", + LiveDuration: 0, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + if expTime, exists := ps.subscribers["test"]["url"]; !exists || !expTime.IsZero() { + t.Error("Error adding no expire subscriber: ", ps.subscribers) + } +} + +func TestUnsubscribe(t *testing.T) { + ps := NewPubSub(nil) + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventType: "test", + PostUrl: "url", + LiveDuration: time.Second, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + if err := ps.Unsubscribe(SubscribeInfo{ + EventType: "test", + PostUrl: "url", + }, &r); err != nil { + t.Error("Error unsubscribing: ", err) + } + if _, exists := ps.subscribers["test"]["url"]; exists { + t.Error("Error adding subscriber: ", ps.subscribers) + } +} + +func TestPublish(t *testing.T) { + ps := NewPubSub(&config.CGRConfig{HttpSkipTlsVerify: true}) + ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { + obj.(map[string]string)["called"] = "yes" + return nil, nil + } + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventType: "test", + PostUrl: "url", + LiveDuration: time.Second, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + m := make(map[string]string) + if err := ps.Publish(PublishInfo{ + EventType: "test", + Event: m, + }, &r); err != nil { + t.Error("Error publishing: ", err) + } + for i := 0; i < 1000; i++ { // wait for the theread to populate map + if len(m) == 0 { + time.Sleep(time.Microsecond) + } else { + break + } + } + if r, exists := m["called"]; !exists || r != "yes" { + t.Error("Error calling publish function: ", m) + } +} + +func TestPublishExpired(t *testing.T) { + ps := NewPubSub(&config.CGRConfig{HttpSkipTlsVerify: true}) + ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { + m := obj.(map[string]string) + m["called"] = "yes" + return nil, nil + } + var r string + if err := ps.Subscribe(SubscribeInfo{ + EventType: "test", + PostUrl: "url", + LiveDuration: 1, + }, &r); err != nil { + t.Error("Error subscribing: ", err) + } + if err := ps.Publish(PublishInfo{ + EventType: "test", + Event: nil, + }, &r); err != nil { + t.Error("Error publishing: ", err) + } + if len(ps.subscribers["test"]) != 0 { + t.Error("Error removing expired subscribers: ", ps.subscribers) + } +} diff --git a/test.sh b/test.sh index dbaf2189e..7c605b910 100755 --- a/test.sh +++ b/test.sh @@ -11,6 +11,7 @@ go test -i github.com/cgrates/cgrates/cdrc go test -i github.com/cgrates/cgrates/utils go test -i github.com/cgrates/cgrates/history go test -i github.com/cgrates/cgrates/cdre +go test -i github.com/cgrates/cgrates/pubsub go test github.com/cgrates/cgrates/apier/v1 v1=$? @@ -36,6 +37,7 @@ go test github.com/cgrates/cgrates/cache2go c2g=$? go test github.com/cgrates/cgrates/cdre cdre=$? +go test github.com/cgrates/cgrates/pubsub +ps=$? -exit $v1 && $v2 && $en && $gt && $sm && $cfg && $bl && $cr && $cdrc && $ut && $hs && $c2g && $cdre - +exit $v1 && $v2 && $en && $gt && $sm && $cfg && $bl && $cr && $cdrc && $ut && $hs && $c2g && $cdre && $ps