mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
added filters in subscriber data
This commit is contained in:
@@ -18,7 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
package console
|
||||
|
||||
import "time"
|
||||
import "github.com/cgrates/cgrates/engine"
|
||||
|
||||
func init() {
|
||||
c := &CmdShowSubscribers{
|
||||
@@ -59,6 +59,6 @@ func (self *CmdShowSubscribers) PostprocessRpcParams() error {
|
||||
}
|
||||
|
||||
func (self *CmdShowSubscribers) RpcResult() interface{} {
|
||||
var s map[string]map[string]time.Time
|
||||
var s map[string]map[string]*engine.SubscriberData
|
||||
return &s
|
||||
}
|
||||
|
||||
@@ -26,11 +26,16 @@ type PublisherSubscriber interface {
|
||||
Subscribe(SubscribeInfo, *string) error
|
||||
Unsubscribe(SubscribeInfo, *string) error
|
||||
Publish(PublishInfo, *string) error
|
||||
ShowSubscribers(string, *map[string]map[string]time.Time) error
|
||||
ShowSubscribers(string, *map[string]map[string]*SubscriberData) error
|
||||
}
|
||||
|
||||
type SubscriberData struct {
|
||||
ExpTime time.Time
|
||||
Filters utils.RSRFields
|
||||
}
|
||||
|
||||
type PubSub struct {
|
||||
subscribers map[string]map[string]time.Time
|
||||
subscribers map[string]map[string]*SubscriberData
|
||||
ttlVerify bool
|
||||
pubFunc func(string, bool, interface{}) ([]byte, error)
|
||||
mux *sync.Mutex
|
||||
@@ -40,7 +45,7 @@ type PubSub struct {
|
||||
func NewPubSub(accountDb AccountingStorage, ttlVerify bool) *PubSub {
|
||||
ps := &PubSub{
|
||||
ttlVerify: ttlVerify,
|
||||
subscribers: make(map[string]map[string]time.Time),
|
||||
subscribers: make(map[string]map[string]*SubscriberData),
|
||||
pubFunc: utils.HttpJsonPost,
|
||||
mux: &sync.Mutex{},
|
||||
accountDb: accountDb,
|
||||
@@ -77,13 +82,21 @@ func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error {
|
||||
return errors.New(*reply)
|
||||
}
|
||||
if ps.subscribers[si.EventName] == nil {
|
||||
ps.subscribers[si.EventName] = make(map[string]time.Time)
|
||||
ps.subscribers[si.EventName] = make(map[string]*SubscriberData)
|
||||
}
|
||||
var expTime time.Time
|
||||
if si.LifeSpan > 0 {
|
||||
expTime = time.Now().Add(si.LifeSpan)
|
||||
}
|
||||
ps.subscribers[si.EventName][utils.InfieldJoin(si.Transport, si.Address)] = expTime
|
||||
rsr, err := utils.ParseRSRFields(si.EventFilter, utils.INFIELD_SEP)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
return err
|
||||
}
|
||||
ps.subscribers[si.EventName][utils.InfieldJoin(si.Transport, si.Address)] = &SubscriberData{
|
||||
ExpTime: expTime,
|
||||
Filters: rsr,
|
||||
}
|
||||
ps.saveSubscribers(si.EventName)
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
@@ -106,7 +119,7 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error {
|
||||
ps.mux.Lock()
|
||||
defer ps.mux.Unlock()
|
||||
subs := ps.subscribers[pi.Event["EventName"]]
|
||||
for transportAddress, expTime := range subs {
|
||||
for transportAddress, subData := range subs {
|
||||
split := utils.InfieldSplit(transportAddress)
|
||||
if len(split) != 2 {
|
||||
Logger.Warning("<PubSub> Wrong transport;address pair: " + transportAddress)
|
||||
@@ -114,7 +127,7 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error {
|
||||
}
|
||||
transport := split[0]
|
||||
address := split[1]
|
||||
if !expTime.IsZero() && expTime.Before(time.Now()) {
|
||||
if !subData.ExpTime.IsZero() && subData.ExpTime.Before(time.Now()) {
|
||||
delete(subs, transportAddress)
|
||||
ps.saveSubscribers(pi.Event["EventName"])
|
||||
continue // subscription expired, do not send event
|
||||
@@ -139,7 +152,7 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *PubSub) ShowSubscribers(in string, out *map[string]map[string]time.Time) error {
|
||||
func (ps *PubSub) ShowSubscribers(in string, out *map[string]map[string]*SubscriberData) error {
|
||||
*out = ps.subscribers
|
||||
return nil
|
||||
}
|
||||
@@ -166,6 +179,6 @@ func (ps *ProxyPubSub) Publish(pi PublishInfo, reply *string) error {
|
||||
return ps.Client.Call("PubSubV1.Publish", pi, reply)
|
||||
}
|
||||
|
||||
func (ps *ProxyPubSub) ShowSubscribers(in string, reply *map[string]map[string]time.Time) error {
|
||||
func (ps *ProxyPubSub) ShowSubscribers(in string, reply *map[string]map[string]*SubscriberData) error {
|
||||
return ps.Client.Call("PubSubV1.ShowSubscribers", in, reply)
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ func TestSubscribe(t *testing.T) {
|
||||
}, &r); err != nil {
|
||||
t.Error("Error subscribing: ", err)
|
||||
}
|
||||
if expTime, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || expTime.IsZero() {
|
||||
if subData, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || subData.ExpTime.IsZero() {
|
||||
t.Error("Error adding subscriber: ", ps.subscribers)
|
||||
}
|
||||
}
|
||||
@@ -64,7 +64,7 @@ func TestSubscribeNoExpire(t *testing.T) {
|
||||
}, &r); err != nil {
|
||||
t.Error("Error subscribing: ", err)
|
||||
}
|
||||
if expTime, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || !expTime.IsZero() {
|
||||
if subData, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || !subData.ExpTime.IsZero() {
|
||||
t.Error("Error adding no expire subscriber: ", ps.subscribers)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"encoding/gob"
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/ugorji/go/codec"
|
||||
@@ -80,8 +79,8 @@ type AccountingStorage interface {
|
||||
SetAccount(*Account) error
|
||||
GetCdrStatsQueue(string) (*StatsQueue, error)
|
||||
SetCdrStatsQueue(*StatsQueue) error
|
||||
GetPubSubSubscribers() (map[string]map[string]time.Time, error)
|
||||
SetPubSubSubscribers(string, map[string]time.Time) error
|
||||
GetPubSubSubscribers() (map[string]map[string]*SubscriberData, error)
|
||||
SetPubSubSubscribers(string, map[string]*SubscriberData) error
|
||||
}
|
||||
|
||||
type CdrStorage interface {
|
||||
|
||||
@@ -561,19 +561,19 @@ func (ms *MapStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetPubSubSubscribers() (result map[string]map[string]time.Time, err error) {
|
||||
result = make(map[string]map[string]time.Time)
|
||||
func (ms *MapStorage) GetPubSubSubscribers() (result map[string]map[string]*SubscriberData, err error) {
|
||||
result = make(map[string]map[string]*SubscriberData)
|
||||
for key, value := range ms.dict {
|
||||
if strings.HasPrefix(key, utils.PUBSUB_SUBSCRIBERS_PREFIX) {
|
||||
subs := make(map[string]time.Time)
|
||||
if err = ms.ms.Unmarshal(value, subs); err == nil {
|
||||
subs := make(map[string]*SubscriberData)
|
||||
if err = ms.ms.Unmarshal(value, &subs); err == nil {
|
||||
result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
func (ms *MapStorage) SetPubSubSubscribers(key string, subs map[string]time.Time) (err error) {
|
||||
func (ms *MapStorage) SetPubSubSubscribers(key string, subs map[string]*SubscriberData) (err error) {
|
||||
result, err := ms.ms.Marshal(subs)
|
||||
ms.dict[utils.PUBSUB_SUBSCRIBERS_PREFIX+key] = result
|
||||
return
|
||||
|
||||
@@ -689,16 +689,16 @@ func (rs *RedisStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]time.Time, err error) {
|
||||
func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]*SubscriberData, err error) {
|
||||
keys, err := rs.db.Keys(utils.PUBSUB_SUBSCRIBERS_PREFIX + "*")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = make(map[string]map[string]time.Time)
|
||||
result = make(map[string]map[string]*SubscriberData)
|
||||
for _, key := range keys {
|
||||
if values, err := rs.db.Get(key); err == nil {
|
||||
subs := make(map[string]time.Time)
|
||||
err = rs.ms.Unmarshal(values, subs)
|
||||
subs := make(map[string]*SubscriberData)
|
||||
err = rs.ms.Unmarshal(values, &subs)
|
||||
result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs
|
||||
} else {
|
||||
return nil, utils.ErrNotFound
|
||||
@@ -707,7 +707,7 @@ func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]tim
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetPubSubSubscribers(key string, subs map[string]time.Time) (err error) {
|
||||
func (rs *RedisStorage) SetPubSubSubscribers(key string, subs map[string]*SubscriberData) (err error) {
|
||||
result, err := rs.ms.Marshal(subs)
|
||||
rs.db.Set(utils.PUBSUB_SUBSCRIBERS_PREFIX+key, result)
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user