This commit is contained in:
DanB
2015-07-02 16:36:03 +02:00
6 changed files with 130 additions and 113 deletions

View File

@@ -59,6 +59,6 @@ func (self *CmdShowSubscribers) PostprocessRpcParams() error {
}
func (self *CmdShowSubscribers) RpcResult() interface{} {
var s map[string]map[string]*engine.SubscriberData
var s map[string]*engine.SubscriberData
return &s
}

View File

@@ -11,22 +11,27 @@ import (
)
type SubscribeInfo struct {
EventName string
EventFilter string
Transport string
Address string
LifeSpan time.Duration
}
type CgrEvent map[string]string
func (ce CgrEvent) Match(rsrFields utils.RSRFields) bool {
return true
}
type PublishInfo struct {
Event map[string]string
Event CgrEvent
}
type PublisherSubscriber interface {
Subscribe(SubscribeInfo, *string) error
Unsubscribe(SubscribeInfo, *string) error
Publish(PublishInfo, *string) error
ShowSubscribers(string, *map[string]map[string]*SubscriberData) error
ShowSubscribers(string, *map[string]*SubscriberData) error
}
type SubscriberData struct {
@@ -35,7 +40,7 @@ type SubscriberData struct {
}
type PubSub struct {
subscribers map[string]map[string]*SubscriberData
subscribers map[string]*SubscriberData
ttlVerify bool
pubFunc func(string, bool, interface{}) ([]byte, error)
mux *sync.Mutex
@@ -45,32 +50,31 @@ type PubSub struct {
func NewPubSub(accountDb AccountingStorage, ttlVerify bool) *PubSub {
ps := &PubSub{
ttlVerify: ttlVerify,
subscribers: make(map[string]map[string]*SubscriberData),
subscribers: make(map[string]*SubscriberData),
pubFunc: utils.HttpJsonPost,
mux: &sync.Mutex{},
accountDb: accountDb,
}
// load subscribers
if subs, err := accountDb.GetPubSubSubscribers(); err == nil {
if subs, err := accountDb.GetSubscribers(); err == nil {
ps.subscribers = subs
}
return ps
}
func (ps *PubSub) saveSubscribers(key string) {
if key != "" {
if _, found := ps.subscribers[key]; !found {
return
}
if err := accountingStorage.SetPubSubSubscribers(key, ps.subscribers[key]); err != nil {
Logger.Err("<PubSub> Error saving subscribers: " + err.Error())
}
} else { // save all
for key, valueMap := range ps.subscribers {
if err := accountingStorage.SetPubSubSubscribers(key, valueMap); err != nil {
Logger.Err("<PubSub> Error saving subscribers: " + err.Error())
}
}
func (ps *PubSub) saveSubscriber(key string) {
subData, found := ps.subscribers[key]
if !found {
return
}
if err := accountingStorage.SetSubscriber(key, subData); err != nil {
Logger.Err("<PubSub> Error saving subscriber: " + err.Error())
}
}
func (ps *PubSub) removeSubscriber(key string) {
if err := accountingStorage.RemoveSubscriber(key); err != nil {
Logger.Err("<PubSub> Error removing subscriber: " + err.Error())
}
}
@@ -81,9 +85,6 @@ func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error {
*reply = "Unsupported transport type"
return errors.New(*reply)
}
if ps.subscribers[si.EventName] == nil {
ps.subscribers[si.EventName] = make(map[string]*SubscriberData)
}
var expTime time.Time
if si.LifeSpan > 0 {
expTime = time.Now().Add(si.LifeSpan)
@@ -93,11 +94,12 @@ func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error {
*reply = err.Error()
return err
}
ps.subscribers[si.EventName][utils.InfieldJoin(si.Transport, si.Address)] = &SubscriberData{
key := utils.InfieldJoin(si.Transport, si.Address)
ps.subscribers[key] = &SubscriberData{
ExpTime: expTime,
Filters: rsr,
}
ps.saveSubscribers(si.EventName)
ps.saveSubscriber(key)
*reply = utils.OK
return nil
}
@@ -109,8 +111,9 @@ func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error {
*reply = "Unsupported transport type"
return errors.New(*reply)
}
delete(ps.subscribers[si.EventName], utils.InfieldJoin(si.Transport, si.Address))
ps.saveSubscribers(si.EventName)
key := utils.InfieldJoin(si.Transport, si.Address)
delete(ps.subscribers, key)
ps.removeSubscriber(key)
*reply = utils.OK
return nil
}
@@ -118,20 +121,23 @@ func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error {
func (ps *PubSub) Publish(pi PublishInfo, reply *string) error {
ps.mux.Lock()
defer ps.mux.Unlock()
subs := ps.subscribers[pi.Event["EventName"]]
for transportAddress, subData := range subs {
split := utils.InfieldSplit(transportAddress)
for key, subData := range ps.subscribers {
if !subData.ExpTime.IsZero() && subData.ExpTime.Before(time.Now()) {
delete(ps.subscribers, key)
ps.removeSubscriber(key)
continue // subscription expired, do not send event
}
if !pi.Event.Match(subData.Filters) {
continue // the event does not match the filters
}
split := utils.InfieldSplit(key)
if len(split) != 2 {
Logger.Warning("<PubSub> Wrong transport;address pair: " + transportAddress)
Logger.Warning("<PubSub> Wrong transport;address pair: " + key)
continue
}
transport := split[0]
address := split[1]
if !subData.ExpTime.IsZero() && subData.ExpTime.Before(time.Now()) {
delete(subs, transportAddress)
ps.saveSubscribers(pi.Event["EventName"])
continue // subscription expired, do not send event
}
switch transport {
case utils.META_HTTP_POST:
go func() {
@@ -152,7 +158,7 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error {
return nil
}
func (ps *PubSub) ShowSubscribers(in string, out *map[string]map[string]*SubscriberData) error {
func (ps *PubSub) ShowSubscribers(in string, out *map[string]*SubscriberData) error {
*out = ps.subscribers
return nil
}
@@ -179,6 +185,6 @@ func (ps *ProxyPubSub) Publish(pi PublishInfo, reply *string) error {
return ps.Client.Call("PubSubV1.Publish", pi, reply)
}
func (ps *ProxyPubSub) ShowSubscribers(in string, reply *map[string]map[string]*SubscriberData) error {
func (ps *ProxyPubSub) ShowSubscribers(in string, reply *map[string]*SubscriberData) error {
return ps.Client.Call("PubSubV1.ShowSubscribers", in, reply)
}

View File

@@ -11,14 +11,14 @@ func TestSubscribe(t *testing.T) {
ps := NewPubSub(accountingStorage, false)
var r string
if err := ps.Subscribe(SubscribeInfo{
EventName: "test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: time.Second,
EventFilter: "EventName/test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: time.Second,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
if subData, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || subData.ExpTime.IsZero() {
if subData, exists := ps.subscribers[utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || subData.ExpTime.IsZero() {
t.Error("Error adding subscriber: ", ps.subscribers)
}
}
@@ -27,15 +27,15 @@ func TestSubscribeSave(t *testing.T) {
ps := NewPubSub(accountingStorage, false)
var r string
if err := ps.Subscribe(SubscribeInfo{
EventName: "test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: time.Second,
EventFilter: "EventName/test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: time.Second,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
subs, err := accountingStorage.GetPubSubSubscribers()
if err != nil || len(subs["test"]) != 1 {
subs, err := accountingStorage.GetSubscribers()
if err != nil || len(subs) != 1 {
t.Error("Error saving subscribers: ", err, subs)
}
}
@@ -44,10 +44,10 @@ func TestSubscribeNoTransport(t *testing.T) {
ps := NewPubSub(accountingStorage, false)
var r string
if err := ps.Subscribe(SubscribeInfo{
EventName: "test",
Transport: "test",
Address: "url",
LifeSpan: time.Second,
EventFilter: "EventName/test",
Transport: "test",
Address: "url",
LifeSpan: time.Second,
}, &r); err == nil {
t.Error("Error subscribing error: ", err)
}
@@ -57,14 +57,14 @@ func TestSubscribeNoExpire(t *testing.T) {
ps := NewPubSub(accountingStorage, false)
var r string
if err := ps.Subscribe(SubscribeInfo{
EventName: "test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: 0,
EventFilter: "EventName/test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: 0,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
if subData, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || !subData.ExpTime.IsZero() {
if subData, exists := ps.subscribers[utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || !subData.ExpTime.IsZero() {
t.Error("Error adding no expire subscriber: ", ps.subscribers)
}
}
@@ -73,21 +73,21 @@ func TestUnsubscribe(t *testing.T) {
ps := NewPubSub(accountingStorage, false)
var r string
if err := ps.Subscribe(SubscribeInfo{
EventName: "test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: time.Second,
EventFilter: "EventName/test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: time.Second,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
if err := ps.Unsubscribe(SubscribeInfo{
EventName: "test",
Transport: utils.META_HTTP_POST,
Address: "url",
EventFilter: "EventName/test",
Transport: utils.META_HTTP_POST,
Address: "url",
}, &r); err != nil {
t.Error("Error unsubscribing: ", err)
}
if _, exists := ps.subscribers["test"]["url"]; exists {
if _, exists := ps.subscribers[utils.InfieldJoin(utils.META_HTTP_POST, "url")]; exists {
t.Error("Error adding subscriber: ", ps.subscribers)
}
}
@@ -96,22 +96,22 @@ func TestUnsubscribeSave(t *testing.T) {
ps := NewPubSub(accountingStorage, false)
var r string
if err := ps.Subscribe(SubscribeInfo{
EventName: "test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: time.Second,
EventFilter: "EventName/test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: time.Second,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
if err := ps.Unsubscribe(SubscribeInfo{
EventName: "test",
Transport: utils.META_HTTP_POST,
Address: "url",
EventFilter: "EventName/test",
Transport: utils.META_HTTP_POST,
Address: "url",
}, &r); err != nil {
t.Error("Error unsubscribing: ", err)
}
subs, err := accountingStorage.GetPubSubSubscribers()
if err != nil || len(subs["test"]) != 0 {
subs, err := accountingStorage.GetSubscribers()
if err != nil || len(subs) != 0 {
t.Error("Error saving subscribers: ", err, subs)
}
}
@@ -119,20 +119,20 @@ func TestUnsubscribeSave(t *testing.T) {
func TestPublish(t *testing.T) {
ps := NewPubSub(accountingStorage, true)
ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) {
obj.(map[string]string)["called"] = url
obj.(CgrEvent)["called"] = url
return nil, nil
}
var r string
if err := ps.Subscribe(SubscribeInfo{
EventName: "test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: time.Second,
EventFilter: "EventName/test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: time.Second,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
m := make(map[string]string)
m["EventName"] = "test"
m["EventFilter"] = "test"
if err := ps.Publish(PublishInfo{
Event: m,
}, &r); err != nil {
@@ -159,19 +159,19 @@ func TestPublishExpired(t *testing.T) {
}
var r string
if err := ps.Subscribe(SubscribeInfo{
EventName: "test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: 1,
EventFilter: "EventName/test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: 1,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
if err := ps.Publish(PublishInfo{
Event: map[string]string{"EventName": "test"},
Event: map[string]string{"EventFilter": "test"},
}, &r); err != nil {
t.Error("Error publishing: ", err)
}
if len(ps.subscribers["test"]) != 0 {
if len(ps.subscribers) != 0 {
t.Error("Error removing expired subscribers: ", ps.subscribers)
}
}
@@ -185,24 +185,24 @@ func TestPublishExpiredSave(t *testing.T) {
}
var r string
if err := ps.Subscribe(SubscribeInfo{
EventName: "test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: 1,
EventFilter: "EventName/test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: 1,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
subs, err := accountingStorage.GetPubSubSubscribers()
if err != nil || len(subs["test"]) != 1 {
subs, err := accountingStorage.GetSubscribers()
if err != nil || len(subs) != 1 {
t.Error("Error saving subscribers: ", err, subs)
}
if err := ps.Publish(PublishInfo{
Event: map[string]string{"EventName": "test"},
Event: map[string]string{"EventFilter": "test"},
}, &r); err != nil {
t.Error("Error publishing: ", err)
}
subs, err = accountingStorage.GetPubSubSubscribers()
if err != nil || len(subs["test"]) != 0 {
subs, err = accountingStorage.GetSubscribers()
if err != nil || len(subs) != 0 {
t.Error("Error saving subscribers: ", err, subs)
}
}

View File

@@ -79,8 +79,9 @@ type AccountingStorage interface {
SetAccount(*Account) error
GetCdrStatsQueue(string) (*StatsQueue, error)
SetCdrStatsQueue(*StatsQueue) error
GetPubSubSubscribers() (map[string]map[string]*SubscriberData, error)
SetPubSubSubscribers(string, map[string]*SubscriberData) error
GetSubscribers() (map[string]*SubscriberData, error)
SetSubscriber(string, *SubscriberData) error
RemoveSubscriber(string) error
}
type CdrStorage interface {

View File

@@ -561,24 +561,29 @@ func (ms *MapStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) {
return
}
func (ms *MapStorage) GetPubSubSubscribers() (result map[string]map[string]*SubscriberData, err error) {
result = make(map[string]map[string]*SubscriberData)
func (ms *MapStorage) GetSubscribers() (result map[string]*SubscriberData, err error) {
result = make(map[string]*SubscriberData)
for key, value := range ms.dict {
if strings.HasPrefix(key, utils.PUBSUB_SUBSCRIBERS_PREFIX) {
subs := make(map[string]*SubscriberData)
if err = ms.ms.Unmarshal(value, &subs); err == nil {
result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs
sub := &SubscriberData{}
if err = ms.ms.Unmarshal(value, sub); err == nil {
result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = sub
}
}
}
return
}
func (ms *MapStorage) SetPubSubSubscribers(key string, subs map[string]*SubscriberData) (err error) {
result, err := ms.ms.Marshal(subs)
func (ms *MapStorage) SetSubscriber(key string, sub *SubscriberData) (err error) {
result, err := ms.ms.Marshal(sub)
ms.dict[utils.PUBSUB_SUBSCRIBERS_PREFIX+key] = result
return
}
func (ms *MapStorage) RemoveSubscriber(key string) (err error) {
delete(ms.dict, utils.PUBSUB_SUBSCRIBERS_PREFIX+key)
return
}
func (ms *MapStorage) GetActionPlans(key string) (ats ActionPlans, err error) {
if values, ok := ms.dict[utils.ACTION_TIMING_PREFIX+key]; ok {
err = ms.ms.Unmarshal(values, &ats)

View File

@@ -689,17 +689,17 @@ func (rs *RedisStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) {
return
}
func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]*SubscriberData, err error) {
func (rs *RedisStorage) GetSubscribers() (result map[string]*SubscriberData, err error) {
keys, err := rs.db.Keys(utils.PUBSUB_SUBSCRIBERS_PREFIX + "*")
if err != nil {
return nil, err
}
result = make(map[string]map[string]*SubscriberData)
result = make(map[string]*SubscriberData)
for _, key := range keys {
if values, err := rs.db.Get(key); err == nil {
subs := make(map[string]*SubscriberData)
err = rs.ms.Unmarshal(values, &subs)
result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs
sub := &SubscriberData{}
err = rs.ms.Unmarshal(values, sub)
result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = sub
} else {
return nil, utils.ErrNotFound
}
@@ -707,12 +707,17 @@ func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]*Su
return
}
func (rs *RedisStorage) SetPubSubSubscribers(key string, subs map[string]*SubscriberData) (err error) {
result, err := rs.ms.Marshal(subs)
func (rs *RedisStorage) SetSubscriber(key string, sub *SubscriberData) (err error) {
result, err := rs.ms.Marshal(sub)
rs.db.Set(utils.PUBSUB_SUBSCRIBERS_PREFIX+key, result)
return
}
func (rs *RedisStorage) RemoveSubscriber(key string) (err error) {
rs.db.Del(utils.PUBSUB_SUBSCRIBERS_PREFIX + key)
return
}
func (rs *RedisStorage) GetActionPlans(key string) (ats ActionPlans, err error) {
var values []byte
if values, err = rs.db.Get(utils.ACTION_TIMING_PREFIX + key); err == nil {