From 897c6fddf6ffd733c6cf15bde8e342e12bba841a Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 2 Jul 2015 11:49:17 +0300 Subject: [PATCH] added filters in subscriber data --- console/show_subscribers.go | 4 ++-- engine/pubsub.go | 31 ++++++++++++++++++++++--------- engine/pubsub_test.go | 4 ++-- engine/storage_interface.go | 5 ++--- engine/storage_map.go | 10 +++++----- engine/storage_redis.go | 10 +++++----- 6 files changed, 38 insertions(+), 26 deletions(-) diff --git a/console/show_subscribers.go b/console/show_subscribers.go index b4591328d..7ddae5af0 100644 --- a/console/show_subscribers.go +++ b/console/show_subscribers.go @@ -18,7 +18,7 @@ along with this program. If not, see package console -import "time" +import "github.com/cgrates/cgrates/engine" func init() { c := &CmdShowSubscribers{ @@ -59,6 +59,6 @@ func (self *CmdShowSubscribers) PostprocessRpcParams() error { } func (self *CmdShowSubscribers) RpcResult() interface{} { - var s map[string]map[string]time.Time + var s map[string]map[string]*engine.SubscriberData return &s } diff --git a/engine/pubsub.go b/engine/pubsub.go index 65b9b3cc7..6784f423d 100644 --- a/engine/pubsub.go +++ b/engine/pubsub.go @@ -26,11 +26,16 @@ type PublisherSubscriber interface { Subscribe(SubscribeInfo, *string) error Unsubscribe(SubscribeInfo, *string) error Publish(PublishInfo, *string) error - ShowSubscribers(string, *map[string]map[string]time.Time) error + ShowSubscribers(string, *map[string]map[string]*SubscriberData) error +} + +type SubscriberData struct { + ExpTime time.Time + Filters utils.RSRFields } type PubSub struct { - subscribers map[string]map[string]time.Time + subscribers map[string]map[string]*SubscriberData ttlVerify bool pubFunc func(string, bool, interface{}) ([]byte, error) mux *sync.Mutex @@ -40,7 +45,7 @@ type PubSub struct { func NewPubSub(accountDb AccountingStorage, ttlVerify bool) *PubSub { ps := &PubSub{ ttlVerify: ttlVerify, - subscribers: make(map[string]map[string]time.Time), + subscribers: make(map[string]map[string]*SubscriberData), pubFunc: utils.HttpJsonPost, mux: &sync.Mutex{}, accountDb: accountDb, @@ -77,13 +82,21 @@ func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error { return errors.New(*reply) } if ps.subscribers[si.EventName] == nil { - ps.subscribers[si.EventName] = make(map[string]time.Time) + ps.subscribers[si.EventName] = make(map[string]*SubscriberData) } var expTime time.Time if si.LifeSpan > 0 { expTime = time.Now().Add(si.LifeSpan) } - ps.subscribers[si.EventName][utils.InfieldJoin(si.Transport, si.Address)] = expTime + rsr, err := utils.ParseRSRFields(si.EventFilter, utils.INFIELD_SEP) + if err != nil { + *reply = err.Error() + return err + } + ps.subscribers[si.EventName][utils.InfieldJoin(si.Transport, si.Address)] = &SubscriberData{ + ExpTime: expTime, + Filters: rsr, + } ps.saveSubscribers(si.EventName) *reply = utils.OK return nil @@ -106,7 +119,7 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { ps.mux.Lock() defer ps.mux.Unlock() subs := ps.subscribers[pi.Event["EventName"]] - for transportAddress, expTime := range subs { + for transportAddress, subData := range subs { split := utils.InfieldSplit(transportAddress) if len(split) != 2 { Logger.Warning(" Wrong transport;address pair: " + transportAddress) @@ -114,7 +127,7 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { } transport := split[0] address := split[1] - if !expTime.IsZero() && expTime.Before(time.Now()) { + if !subData.ExpTime.IsZero() && subData.ExpTime.Before(time.Now()) { delete(subs, transportAddress) ps.saveSubscribers(pi.Event["EventName"]) continue // subscription expired, do not send event @@ -139,7 +152,7 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { return nil } -func (ps *PubSub) ShowSubscribers(in string, out *map[string]map[string]time.Time) error { +func (ps *PubSub) ShowSubscribers(in string, out *map[string]map[string]*SubscriberData) error { *out = ps.subscribers return nil } @@ -166,6 +179,6 @@ func (ps *ProxyPubSub) Publish(pi PublishInfo, reply *string) error { return ps.Client.Call("PubSubV1.Publish", pi, reply) } -func (ps *ProxyPubSub) ShowSubscribers(in string, reply *map[string]map[string]time.Time) error { +func (ps *ProxyPubSub) ShowSubscribers(in string, reply *map[string]map[string]*SubscriberData) error { return ps.Client.Call("PubSubV1.ShowSubscribers", in, reply) } diff --git a/engine/pubsub_test.go b/engine/pubsub_test.go index d693414f0..6e027a7b8 100644 --- a/engine/pubsub_test.go +++ b/engine/pubsub_test.go @@ -18,7 +18,7 @@ func TestSubscribe(t *testing.T) { }, &r); err != nil { t.Error("Error subscribing: ", err) } - if expTime, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || expTime.IsZero() { + if subData, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || subData.ExpTime.IsZero() { t.Error("Error adding subscriber: ", ps.subscribers) } } @@ -64,7 +64,7 @@ func TestSubscribeNoExpire(t *testing.T) { }, &r); err != nil { t.Error("Error subscribing: ", err) } - if expTime, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || !expTime.IsZero() { + if subData, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || !subData.ExpTime.IsZero() { t.Error("Error adding no expire subscriber: ", ps.subscribers) } } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 631d56f03..faa8fc560 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -23,7 +23,6 @@ import ( "encoding/gob" "encoding/json" "reflect" - "time" "github.com/cgrates/cgrates/utils" "github.com/ugorji/go/codec" @@ -80,8 +79,8 @@ type AccountingStorage interface { SetAccount(*Account) error GetCdrStatsQueue(string) (*StatsQueue, error) SetCdrStatsQueue(*StatsQueue) error - GetPubSubSubscribers() (map[string]map[string]time.Time, error) - SetPubSubSubscribers(string, map[string]time.Time) error + GetPubSubSubscribers() (map[string]map[string]*SubscriberData, error) + SetPubSubSubscribers(string, map[string]*SubscriberData) error } type CdrStorage interface { diff --git a/engine/storage_map.go b/engine/storage_map.go index f64d00d73..b729bd102 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -561,19 +561,19 @@ func (ms *MapStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { return } -func (ms *MapStorage) GetPubSubSubscribers() (result map[string]map[string]time.Time, err error) { - result = make(map[string]map[string]time.Time) +func (ms *MapStorage) GetPubSubSubscribers() (result map[string]map[string]*SubscriberData, err error) { + result = make(map[string]map[string]*SubscriberData) for key, value := range ms.dict { if strings.HasPrefix(key, utils.PUBSUB_SUBSCRIBERS_PREFIX) { - subs := make(map[string]time.Time) - if err = ms.ms.Unmarshal(value, subs); err == nil { + subs := make(map[string]*SubscriberData) + if err = ms.ms.Unmarshal(value, &subs); err == nil { result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs } } } return } -func (ms *MapStorage) SetPubSubSubscribers(key string, subs map[string]time.Time) (err error) { +func (ms *MapStorage) SetPubSubSubscribers(key string, subs map[string]*SubscriberData) (err error) { result, err := ms.ms.Marshal(subs) ms.dict[utils.PUBSUB_SUBSCRIBERS_PREFIX+key] = result return diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 0270133de..c571d7622 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -689,16 +689,16 @@ func (rs *RedisStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { return } -func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]time.Time, err error) { +func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]*SubscriberData, err error) { keys, err := rs.db.Keys(utils.PUBSUB_SUBSCRIBERS_PREFIX + "*") if err != nil { return nil, err } - result = make(map[string]map[string]time.Time) + result = make(map[string]map[string]*SubscriberData) for _, key := range keys { if values, err := rs.db.Get(key); err == nil { - subs := make(map[string]time.Time) - err = rs.ms.Unmarshal(values, subs) + subs := make(map[string]*SubscriberData) + err = rs.ms.Unmarshal(values, &subs) result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs } else { return nil, utils.ErrNotFound @@ -707,7 +707,7 @@ func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]tim return } -func (rs *RedisStorage) SetPubSubSubscribers(key string, subs map[string]time.Time) (err error) { +func (rs *RedisStorage) SetPubSubSubscribers(key string, subs map[string]*SubscriberData) (err error) { result, err := rs.ms.Marshal(subs) rs.db.Set(utils.PUBSUB_SUBSCRIBERS_PREFIX+key, result) return