diff --git a/console/show_subscribers.go b/console/show_subscribers.go index 7ddae5af0..6a27adfa7 100644 --- a/console/show_subscribers.go +++ b/console/show_subscribers.go @@ -59,6 +59,6 @@ func (self *CmdShowSubscribers) PostprocessRpcParams() error { } func (self *CmdShowSubscribers) RpcResult() interface{} { - var s map[string]map[string]*engine.SubscriberData + var s map[string]*engine.SubscriberData return &s } diff --git a/engine/pubsub.go b/engine/pubsub.go index 6784f423d..0a1116b69 100644 --- a/engine/pubsub.go +++ b/engine/pubsub.go @@ -11,22 +11,27 @@ import ( ) type SubscribeInfo struct { - EventName string EventFilter string Transport string Address string LifeSpan time.Duration } +type CgrEvent map[string]string + +func (ce CgrEvent) Match(rsrFields utils.RSRFields) bool { + return true +} + type PublishInfo struct { - Event map[string]string + Event CgrEvent } type PublisherSubscriber interface { Subscribe(SubscribeInfo, *string) error Unsubscribe(SubscribeInfo, *string) error Publish(PublishInfo, *string) error - ShowSubscribers(string, *map[string]map[string]*SubscriberData) error + ShowSubscribers(string, *map[string]*SubscriberData) error } type SubscriberData struct { @@ -35,7 +40,7 @@ type SubscriberData struct { } type PubSub struct { - subscribers map[string]map[string]*SubscriberData + subscribers map[string]*SubscriberData ttlVerify bool pubFunc func(string, bool, interface{}) ([]byte, error) mux *sync.Mutex @@ -45,32 +50,31 @@ type PubSub struct { func NewPubSub(accountDb AccountingStorage, ttlVerify bool) *PubSub { ps := &PubSub{ ttlVerify: ttlVerify, - subscribers: make(map[string]map[string]*SubscriberData), + subscribers: make(map[string]*SubscriberData), pubFunc: utils.HttpJsonPost, mux: &sync.Mutex{}, accountDb: accountDb, } // load subscribers - if subs, err := accountDb.GetPubSubSubscribers(); err == nil { + if subs, err := accountDb.GetSubscribers(); err == nil { ps.subscribers = subs } return ps } -func (ps *PubSub) saveSubscribers(key string) { - if key != "" { - if _, found := ps.subscribers[key]; !found { - return - } - if err := accountingStorage.SetPubSubSubscribers(key, ps.subscribers[key]); err != nil { - Logger.Err(" Error saving subscribers: " + err.Error()) - } - } else { // save all - for key, valueMap := range ps.subscribers { - if err := accountingStorage.SetPubSubSubscribers(key, valueMap); err != nil { - Logger.Err(" Error saving subscribers: " + err.Error()) - } - } +func (ps *PubSub) saveSubscriber(key string) { + subData, found := ps.subscribers[key] + if !found { + return + } + if err := accountingStorage.SetSubscriber(key, subData); err != nil { + Logger.Err(" Error saving subscriber: " + err.Error()) + } +} + +func (ps *PubSub) removeSubscriber(key string) { + if err := accountingStorage.RemoveSubscriber(key); err != nil { + Logger.Err(" Error removing subscriber: " + err.Error()) } } @@ -81,9 +85,6 @@ func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error { *reply = "Unsupported transport type" return errors.New(*reply) } - if ps.subscribers[si.EventName] == nil { - ps.subscribers[si.EventName] = make(map[string]*SubscriberData) - } var expTime time.Time if si.LifeSpan > 0 { expTime = time.Now().Add(si.LifeSpan) @@ -93,11 +94,12 @@ func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error { *reply = err.Error() return err } - ps.subscribers[si.EventName][utils.InfieldJoin(si.Transport, si.Address)] = &SubscriberData{ + key := utils.InfieldJoin(si.Transport, si.Address) + ps.subscribers[key] = &SubscriberData{ ExpTime: expTime, Filters: rsr, } - ps.saveSubscribers(si.EventName) + ps.saveSubscriber(key) *reply = utils.OK return nil } @@ -109,8 +111,9 @@ func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error { *reply = "Unsupported transport type" return errors.New(*reply) } - delete(ps.subscribers[si.EventName], utils.InfieldJoin(si.Transport, si.Address)) - ps.saveSubscribers(si.EventName) + key := utils.InfieldJoin(si.Transport, si.Address) + delete(ps.subscribers, key) + ps.removeSubscriber(key) *reply = utils.OK return nil } @@ -118,20 +121,23 @@ func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error { func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { ps.mux.Lock() defer ps.mux.Unlock() - subs := ps.subscribers[pi.Event["EventName"]] - for transportAddress, subData := range subs { - split := utils.InfieldSplit(transportAddress) + for key, subData := range ps.subscribers { + if !subData.ExpTime.IsZero() && subData.ExpTime.Before(time.Now()) { + delete(ps.subscribers, key) + ps.removeSubscriber(key) + continue // subscription expired, do not send event + } + if !pi.Event.Match(subData.Filters) { + continue // the event does not match the filters + } + split := utils.InfieldSplit(key) if len(split) != 2 { - Logger.Warning(" Wrong transport;address pair: " + transportAddress) + Logger.Warning(" Wrong transport;address pair: " + key) continue } transport := split[0] address := split[1] - if !subData.ExpTime.IsZero() && subData.ExpTime.Before(time.Now()) { - delete(subs, transportAddress) - ps.saveSubscribers(pi.Event["EventName"]) - continue // subscription expired, do not send event - } + switch transport { case utils.META_HTTP_POST: go func() { @@ -152,7 +158,7 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { return nil } -func (ps *PubSub) ShowSubscribers(in string, out *map[string]map[string]*SubscriberData) error { +func (ps *PubSub) ShowSubscribers(in string, out *map[string]*SubscriberData) error { *out = ps.subscribers return nil } @@ -179,6 +185,6 @@ func (ps *ProxyPubSub) Publish(pi PublishInfo, reply *string) error { return ps.Client.Call("PubSubV1.Publish", pi, reply) } -func (ps *ProxyPubSub) ShowSubscribers(in string, reply *map[string]map[string]*SubscriberData) error { +func (ps *ProxyPubSub) ShowSubscribers(in string, reply *map[string]*SubscriberData) error { return ps.Client.Call("PubSubV1.ShowSubscribers", in, reply) } diff --git a/engine/pubsub_test.go b/engine/pubsub_test.go index 6e027a7b8..21935ead8 100644 --- a/engine/pubsub_test.go +++ b/engine/pubsub_test.go @@ -11,14 +11,14 @@ func TestSubscribe(t *testing.T) { ps := NewPubSub(accountingStorage, false) var r string if err := ps.Subscribe(SubscribeInfo{ - EventName: "test", - Transport: utils.META_HTTP_POST, - Address: "url", - LifeSpan: time.Second, + EventFilter: "EventName/test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, }, &r); err != nil { t.Error("Error subscribing: ", err) } - if subData, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || subData.ExpTime.IsZero() { + if subData, exists := ps.subscribers[utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || subData.ExpTime.IsZero() { t.Error("Error adding subscriber: ", ps.subscribers) } } @@ -27,15 +27,15 @@ func TestSubscribeSave(t *testing.T) { ps := NewPubSub(accountingStorage, false) var r string if err := ps.Subscribe(SubscribeInfo{ - EventName: "test", - Transport: utils.META_HTTP_POST, - Address: "url", - LifeSpan: time.Second, + EventFilter: "EventName/test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, }, &r); err != nil { t.Error("Error subscribing: ", err) } - subs, err := accountingStorage.GetPubSubSubscribers() - if err != nil || len(subs["test"]) != 1 { + subs, err := accountingStorage.GetSubscribers() + if err != nil || len(subs) != 1 { t.Error("Error saving subscribers: ", err, subs) } } @@ -44,10 +44,10 @@ func TestSubscribeNoTransport(t *testing.T) { ps := NewPubSub(accountingStorage, false) var r string if err := ps.Subscribe(SubscribeInfo{ - EventName: "test", - Transport: "test", - Address: "url", - LifeSpan: time.Second, + EventFilter: "EventName/test", + Transport: "test", + Address: "url", + LifeSpan: time.Second, }, &r); err == nil { t.Error("Error subscribing error: ", err) } @@ -57,14 +57,14 @@ func TestSubscribeNoExpire(t *testing.T) { ps := NewPubSub(accountingStorage, false) var r string if err := ps.Subscribe(SubscribeInfo{ - EventName: "test", - Transport: utils.META_HTTP_POST, - Address: "url", - LifeSpan: 0, + EventFilter: "EventName/test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: 0, }, &r); err != nil { t.Error("Error subscribing: ", err) } - if subData, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || !subData.ExpTime.IsZero() { + if subData, exists := ps.subscribers[utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || !subData.ExpTime.IsZero() { t.Error("Error adding no expire subscriber: ", ps.subscribers) } } @@ -73,21 +73,21 @@ func TestUnsubscribe(t *testing.T) { ps := NewPubSub(accountingStorage, false) var r string if err := ps.Subscribe(SubscribeInfo{ - EventName: "test", - Transport: utils.META_HTTP_POST, - Address: "url", - LifeSpan: time.Second, + EventFilter: "EventName/test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, }, &r); err != nil { t.Error("Error subscribing: ", err) } if err := ps.Unsubscribe(SubscribeInfo{ - EventName: "test", - Transport: utils.META_HTTP_POST, - Address: "url", + EventFilter: "EventName/test", + Transport: utils.META_HTTP_POST, + Address: "url", }, &r); err != nil { t.Error("Error unsubscribing: ", err) } - if _, exists := ps.subscribers["test"]["url"]; exists { + if _, exists := ps.subscribers[utils.InfieldJoin(utils.META_HTTP_POST, "url")]; exists { t.Error("Error adding subscriber: ", ps.subscribers) } } @@ -96,22 +96,22 @@ func TestUnsubscribeSave(t *testing.T) { ps := NewPubSub(accountingStorage, false) var r string if err := ps.Subscribe(SubscribeInfo{ - EventName: "test", - Transport: utils.META_HTTP_POST, - Address: "url", - LifeSpan: time.Second, + EventFilter: "EventName/test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, }, &r); err != nil { t.Error("Error subscribing: ", err) } if err := ps.Unsubscribe(SubscribeInfo{ - EventName: "test", - Transport: utils.META_HTTP_POST, - Address: "url", + EventFilter: "EventName/test", + Transport: utils.META_HTTP_POST, + Address: "url", }, &r); err != nil { t.Error("Error unsubscribing: ", err) } - subs, err := accountingStorage.GetPubSubSubscribers() - if err != nil || len(subs["test"]) != 0 { + subs, err := accountingStorage.GetSubscribers() + if err != nil || len(subs) != 0 { t.Error("Error saving subscribers: ", err, subs) } } @@ -119,20 +119,20 @@ func TestUnsubscribeSave(t *testing.T) { func TestPublish(t *testing.T) { ps := NewPubSub(accountingStorage, true) ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { - obj.(map[string]string)["called"] = url + obj.(CgrEvent)["called"] = url return nil, nil } var r string if err := ps.Subscribe(SubscribeInfo{ - EventName: "test", - Transport: utils.META_HTTP_POST, - Address: "url", - LifeSpan: time.Second, + EventFilter: "EventName/test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: time.Second, }, &r); err != nil { t.Error("Error subscribing: ", err) } m := make(map[string]string) - m["EventName"] = "test" + m["EventFilter"] = "test" if err := ps.Publish(PublishInfo{ Event: m, }, &r); err != nil { @@ -159,19 +159,19 @@ func TestPublishExpired(t *testing.T) { } var r string if err := ps.Subscribe(SubscribeInfo{ - EventName: "test", - Transport: utils.META_HTTP_POST, - Address: "url", - LifeSpan: 1, + EventFilter: "EventName/test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: 1, }, &r); err != nil { t.Error("Error subscribing: ", err) } if err := ps.Publish(PublishInfo{ - Event: map[string]string{"EventName": "test"}, + Event: map[string]string{"EventFilter": "test"}, }, &r); err != nil { t.Error("Error publishing: ", err) } - if len(ps.subscribers["test"]) != 0 { + if len(ps.subscribers) != 0 { t.Error("Error removing expired subscribers: ", ps.subscribers) } } @@ -185,24 +185,24 @@ func TestPublishExpiredSave(t *testing.T) { } var r string if err := ps.Subscribe(SubscribeInfo{ - EventName: "test", - Transport: utils.META_HTTP_POST, - Address: "url", - LifeSpan: 1, + EventFilter: "EventName/test", + Transport: utils.META_HTTP_POST, + Address: "url", + LifeSpan: 1, }, &r); err != nil { t.Error("Error subscribing: ", err) } - subs, err := accountingStorage.GetPubSubSubscribers() - if err != nil || len(subs["test"]) != 1 { + subs, err := accountingStorage.GetSubscribers() + if err != nil || len(subs) != 1 { t.Error("Error saving subscribers: ", err, subs) } if err := ps.Publish(PublishInfo{ - Event: map[string]string{"EventName": "test"}, + Event: map[string]string{"EventFilter": "test"}, }, &r); err != nil { t.Error("Error publishing: ", err) } - subs, err = accountingStorage.GetPubSubSubscribers() - if err != nil || len(subs["test"]) != 0 { + subs, err = accountingStorage.GetSubscribers() + if err != nil || len(subs) != 0 { t.Error("Error saving subscribers: ", err, subs) } } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index faa8fc560..9e7a0a0b1 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -79,8 +79,9 @@ type AccountingStorage interface { SetAccount(*Account) error GetCdrStatsQueue(string) (*StatsQueue, error) SetCdrStatsQueue(*StatsQueue) error - GetPubSubSubscribers() (map[string]map[string]*SubscriberData, error) - SetPubSubSubscribers(string, map[string]*SubscriberData) error + GetSubscribers() (map[string]*SubscriberData, error) + SetSubscriber(string, *SubscriberData) error + RemoveSubscriber(string) error } type CdrStorage interface { diff --git a/engine/storage_map.go b/engine/storage_map.go index b729bd102..61d98e145 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -561,24 +561,29 @@ func (ms *MapStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { return } -func (ms *MapStorage) GetPubSubSubscribers() (result map[string]map[string]*SubscriberData, err error) { - result = make(map[string]map[string]*SubscriberData) +func (ms *MapStorage) GetSubscribers() (result map[string]*SubscriberData, err error) { + result = make(map[string]*SubscriberData) for key, value := range ms.dict { if strings.HasPrefix(key, utils.PUBSUB_SUBSCRIBERS_PREFIX) { - subs := make(map[string]*SubscriberData) - if err = ms.ms.Unmarshal(value, &subs); err == nil { - result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs + sub := &SubscriberData{} + if err = ms.ms.Unmarshal(value, sub); err == nil { + result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = sub } } } return } -func (ms *MapStorage) SetPubSubSubscribers(key string, subs map[string]*SubscriberData) (err error) { - result, err := ms.ms.Marshal(subs) +func (ms *MapStorage) SetSubscriber(key string, sub *SubscriberData) (err error) { + result, err := ms.ms.Marshal(sub) ms.dict[utils.PUBSUB_SUBSCRIBERS_PREFIX+key] = result return } +func (ms *MapStorage) RemoveSubscriber(key string) (err error) { + delete(ms.dict, utils.PUBSUB_SUBSCRIBERS_PREFIX+key) + return +} + func (ms *MapStorage) GetActionPlans(key string) (ats ActionPlans, err error) { if values, ok := ms.dict[utils.ACTION_TIMING_PREFIX+key]; ok { err = ms.ms.Unmarshal(values, &ats) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index c571d7622..c53a0c679 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -689,17 +689,17 @@ func (rs *RedisStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { return } -func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]*SubscriberData, err error) { +func (rs *RedisStorage) GetSubscribers() (result map[string]*SubscriberData, err error) { keys, err := rs.db.Keys(utils.PUBSUB_SUBSCRIBERS_PREFIX + "*") if err != nil { return nil, err } - result = make(map[string]map[string]*SubscriberData) + result = make(map[string]*SubscriberData) for _, key := range keys { if values, err := rs.db.Get(key); err == nil { - subs := make(map[string]*SubscriberData) - err = rs.ms.Unmarshal(values, &subs) - result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs + sub := &SubscriberData{} + err = rs.ms.Unmarshal(values, sub) + result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = sub } else { return nil, utils.ErrNotFound } @@ -707,12 +707,17 @@ func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]*Su return } -func (rs *RedisStorage) SetPubSubSubscribers(key string, subs map[string]*SubscriberData) (err error) { - result, err := rs.ms.Marshal(subs) +func (rs *RedisStorage) SetSubscriber(key string, sub *SubscriberData) (err error) { + result, err := rs.ms.Marshal(sub) rs.db.Set(utils.PUBSUB_SUBSCRIBERS_PREFIX+key, result) return } +func (rs *RedisStorage) RemoveSubscriber(key string) (err error) { + rs.db.Del(utils.PUBSUB_SUBSCRIBERS_PREFIX + key) + return +} + func (rs *RedisStorage) GetActionPlans(key string) (ats ActionPlans, err error) { var values []byte if values, err = rs.db.Get(utils.ACTION_TIMING_PREFIX + key); err == nil {