mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add Get/Set/Remove Subscriber in DataManager
This commit is contained in:
committed by
Dan Christian Bogos
parent
310469b3b7
commit
d36eaf166b
@@ -765,3 +765,15 @@ func (dm *DataManager) GetUsers() (result []*UserProfile, err error) {
|
||||
func (dm *DataManager) RemoveUser(key string) error {
|
||||
return dm.DataDB().RemoveUserDrv(key)
|
||||
}
|
||||
|
||||
func (dm *DataManager) GetSubscribers() (result map[string]*SubscriberData, err error) {
|
||||
return dm.DataDB().GetSubscribersDrv()
|
||||
}
|
||||
|
||||
func (dm *DataManager) SetSubscriber(key string, sub *SubscriberData) (err error) {
|
||||
return dm.DataDB().SetSubscriberDrv(key, sub)
|
||||
}
|
||||
|
||||
func (dm *DataManager) RemoveSubscriber(key string) (err error) {
|
||||
return dm.DataDB().RemoveSubscriberDrv(key)
|
||||
}
|
||||
|
||||
@@ -1607,7 +1607,7 @@ func testOnStorITCRUDCdrStatsQueue(t *testing.T) {
|
||||
}
|
||||
|
||||
func testOnStorITCRUDSubscribers(t *testing.T) {
|
||||
if sbs, err := onStor.DataDB().GetSubscribers(); err != nil {
|
||||
if sbs, err := onStor.GetSubscribers(); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(sbs) != 0 {
|
||||
t.Errorf("Received subscribers: %+v", sbs)
|
||||
@@ -1616,18 +1616,18 @@ func testOnStorITCRUDSubscribers(t *testing.T) {
|
||||
ExpTime: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(),
|
||||
Filters: utils.ParseRSRFieldsMustCompile("^*default", utils.INFIELD_SEP)}
|
||||
sbscID := "testOnStorITCRUDSubscribers"
|
||||
if err := onStor.DataDB().SetSubscriber(sbscID, sbsc); err != nil {
|
||||
if err := onStor.SetSubscriber(sbscID, sbsc); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rcv, err := onStor.DataDB().GetSubscribers(); err != nil {
|
||||
if rcv, err := onStor.GetSubscribers(); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(sbsc.ExpTime, rcv[sbscID].ExpTime) { // Test just ExpTime since RSRField is more complex behind
|
||||
t.Errorf("Expecting: %v, received: %v", sbsc, rcv[sbscID])
|
||||
}
|
||||
if err := onStor.DataDB().RemoveSubscriber(sbscID); err != nil {
|
||||
if err := onStor.RemoveSubscriber(sbscID); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if sbs, err := onStor.DataDB().GetSubscribers(); err != nil {
|
||||
if sbs, err := onStor.GetSubscribers(); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(sbs) != 0 {
|
||||
t.Errorf("Received subscribers: %+v", sbs)
|
||||
|
||||
@@ -69,7 +69,7 @@ func NewPubSub(dm *DataManager, ttlVerify bool) (*PubSub, error) {
|
||||
dm: dm,
|
||||
}
|
||||
// load subscribers
|
||||
if subs, err := dm.DataDB().GetSubscribers(); err != nil {
|
||||
if subs, err := dm.GetSubscribers(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
ps.subscribers = subs
|
||||
@@ -87,13 +87,13 @@ func (ps *PubSub) saveSubscriber(key string) {
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
if err := dm.DataDB().SetSubscriber(key, subData); err != nil {
|
||||
if err := dm.SetSubscriber(key, subData); err != nil {
|
||||
utils.Logger.Err("<PubSub> Error saving subscriber: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *PubSub) removeSubscriber(key string) {
|
||||
if err := dm.DataDB().RemoveSubscriber(key); err != nil {
|
||||
if err := dm.RemoveSubscriber(key); err != nil {
|
||||
utils.Logger.Err("<PubSub> Error removing subscriber: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ func TestSubscribeSave(t *testing.T) {
|
||||
}, &r); err != nil {
|
||||
t.Error("Error subscribing: ", err)
|
||||
}
|
||||
subs, err := dm.DataDB().GetSubscribers()
|
||||
subs, err := dm.GetSubscribers()
|
||||
if err != nil || len(subs) != 1 {
|
||||
t.Error("Error saving subscribers: ", err, subs)
|
||||
}
|
||||
@@ -145,7 +145,7 @@ func TestUnsubscribeSave(t *testing.T) {
|
||||
}, &r); err != nil {
|
||||
t.Error("Error unsubscribing: ", err)
|
||||
}
|
||||
subs, err := dm.DataDB().GetSubscribers()
|
||||
subs, err := dm.GetSubscribers()
|
||||
if err != nil || len(subs) != 0 {
|
||||
t.Error("Error saving subscribers: ", err, subs)
|
||||
}
|
||||
@@ -193,14 +193,14 @@ func TestPublishExpiredSave(t *testing.T) {
|
||||
}, &r); err != nil {
|
||||
t.Error("Error subscribing: ", err)
|
||||
}
|
||||
subs, err := dm.DataDB().GetSubscribers()
|
||||
subs, err := dm.GetSubscribers()
|
||||
if err != nil || len(subs) != 1 {
|
||||
t.Error("Error saving subscribers: ", err, subs)
|
||||
}
|
||||
if err := ps.Publish(map[string]string{"EventFilter": "test"}, &r); err != nil {
|
||||
t.Error("Error publishing: ", err)
|
||||
}
|
||||
subs, err = dm.DataDB().GetSubscribers()
|
||||
subs, err = dm.GetSubscribers()
|
||||
if err != nil || len(subs) != 0 {
|
||||
t.Error("Error saving subscribers: ", err, subs)
|
||||
}
|
||||
|
||||
@@ -87,9 +87,9 @@ type DataDB interface {
|
||||
RemoveAccount(string) error
|
||||
GetCdrStatsQueue(string) (*CDRStatsQueue, error)
|
||||
SetCdrStatsQueue(*CDRStatsQueue) error
|
||||
GetSubscribers() (map[string]*SubscriberData, error)
|
||||
SetSubscriber(string, *SubscriberData) error
|
||||
RemoveSubscriber(string) error
|
||||
GetSubscribersDrv() (map[string]*SubscriberData, error)
|
||||
SetSubscriberDrv(string, *SubscriberData) error
|
||||
RemoveSubscriberDrv(string) error
|
||||
SetUserDrv(*UserProfile) error
|
||||
GetUserDrv(string) (*UserProfile, error)
|
||||
GetUsersDrv() ([]*UserProfile, error)
|
||||
|
||||
@@ -560,7 +560,7 @@ func (ms *MapStorage) SetCdrStatsQueue(sq *CDRStatsQueue) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetSubscribers() (result map[string]*SubscriberData, err error) {
|
||||
func (ms *MapStorage) GetSubscribersDrv() (result map[string]*SubscriberData, err error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
result = make(map[string]*SubscriberData)
|
||||
@@ -574,7 +574,7 @@ func (ms *MapStorage) GetSubscribers() (result map[string]*SubscriberData, err e
|
||||
}
|
||||
return
|
||||
}
|
||||
func (ms *MapStorage) SetSubscriber(key string, sub *SubscriberData) (err error) {
|
||||
func (ms *MapStorage) SetSubscriberDrv(key string, sub *SubscriberData) (err error) {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
result, err := ms.ms.Marshal(sub)
|
||||
@@ -582,7 +582,7 @@ func (ms *MapStorage) SetSubscriber(key string, sub *SubscriberData) (err error)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) RemoveSubscriber(key string) (err error) {
|
||||
func (ms *MapStorage) RemoveSubscriberDrv(key string) (err error) {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
delete(ms.dict, utils.PUBSUB_SUBSCRIBERS_PREFIX+key)
|
||||
|
||||
@@ -1057,7 +1057,7 @@ func (ms *MongoStorage) SetCdrStatsQueue(sq *CDRStatsQueue) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetSubscribers() (result map[string]*SubscriberData, err error) {
|
||||
func (ms *MongoStorage) GetSubscribersDrv() (result map[string]*SubscriberData, err error) {
|
||||
session, col := ms.conn(colPbs)
|
||||
defer session.Close()
|
||||
iter := col.Find(nil).Iter()
|
||||
@@ -1073,7 +1073,7 @@ func (ms *MongoStorage) GetSubscribers() (result map[string]*SubscriberData, err
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetSubscriber(key string, sub *SubscriberData) (err error) {
|
||||
func (ms *MongoStorage) SetSubscriberDrv(key string, sub *SubscriberData) (err error) {
|
||||
session, col := ms.conn(colPbs)
|
||||
defer session.Close()
|
||||
_, err = col.Upsert(bson.M{"key": key}, &struct {
|
||||
@@ -1083,7 +1083,7 @@ func (ms *MongoStorage) SetSubscriber(key string, sub *SubscriberData) (err erro
|
||||
return err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) RemoveSubscriber(key string) (err error) {
|
||||
func (ms *MongoStorage) RemoveSubscriberDrv(key string) (err error) {
|
||||
session, col := ms.conn(colPbs)
|
||||
defer session.Close()
|
||||
return col.Remove(bson.M{"key": key})
|
||||
|
||||
@@ -611,7 +611,7 @@ func (rs *RedisStorage) SetCdrStatsQueue(sq *CDRStatsQueue) (err error) {
|
||||
return rs.Cmd("SET", utils.CDR_STATS_QUEUE_PREFIX+sq.GetId(), result).Err
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetSubscribers() (result map[string]*SubscriberData, err error) {
|
||||
func (rs *RedisStorage) GetSubscribersDrv() (result map[string]*SubscriberData, err error) {
|
||||
keys, err := rs.Cmd("KEYS", utils.PUBSUB_SUBSCRIBERS_PREFIX+"*").List()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -634,7 +634,7 @@ func (rs *RedisStorage) GetSubscribers() (result map[string]*SubscriberData, err
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetSubscriber(key string, sub *SubscriberData) (err error) {
|
||||
func (rs *RedisStorage) SetSubscriberDrv(key string, sub *SubscriberData) (err error) {
|
||||
result, err := rs.ms.Marshal(sub)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -642,7 +642,7 @@ func (rs *RedisStorage) SetSubscriber(key string, sub *SubscriberData) (err erro
|
||||
return rs.Cmd("SET", utils.PUBSUB_SUBSCRIBERS_PREFIX+key, result).Err
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) RemoveSubscriber(key string) (err error) {
|
||||
func (rs *RedisStorage) RemoveSubscriberDrv(key string) (err error) {
|
||||
err = rs.Cmd("DEL", utils.PUBSUB_SUBSCRIBERS_PREFIX+key).Err
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user