From d36eaf166bb44f0d1a194e0399daf508798fde1f Mon Sep 17 00:00:00 2001 From: TeoV Date: Mon, 30 Oct 2017 16:25:10 +0200 Subject: [PATCH] Add Get/Set/Remove Subscriber in DataManager --- engine/datamanager.go | 12 ++++++++++++ engine/onstor_it_test.go | 10 +++++----- engine/pubsub.go | 6 +++--- engine/pubsub_test.go | 8 ++++---- engine/storage_interface.go | 6 +++--- engine/storage_map.go | 6 +++--- engine/storage_mongo_datadb.go | 6 +++--- engine/storage_redis.go | 6 +++--- 8 files changed, 36 insertions(+), 24 deletions(-) diff --git a/engine/datamanager.go b/engine/datamanager.go index f19c35e4d..235685175 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -765,3 +765,15 @@ func (dm *DataManager) GetUsers() (result []*UserProfile, err error) { func (dm *DataManager) RemoveUser(key string) error { return dm.DataDB().RemoveUserDrv(key) } + +func (dm *DataManager) GetSubscribers() (result map[string]*SubscriberData, err error) { + return dm.DataDB().GetSubscribersDrv() +} + +func (dm *DataManager) SetSubscriber(key string, sub *SubscriberData) (err error) { + return dm.DataDB().SetSubscriberDrv(key, sub) +} + +func (dm *DataManager) RemoveSubscriber(key string) (err error) { + return dm.DataDB().RemoveSubscriberDrv(key) +} diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 58fefa9ca..546b17b18 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -1607,7 +1607,7 @@ func testOnStorITCRUDCdrStatsQueue(t *testing.T) { } func testOnStorITCRUDSubscribers(t *testing.T) { - if sbs, err := onStor.DataDB().GetSubscribers(); err != nil { + if sbs, err := onStor.GetSubscribers(); err != nil { t.Error(err) } else if len(sbs) != 0 { t.Errorf("Received subscribers: %+v", sbs) @@ -1616,18 +1616,18 @@ func testOnStorITCRUDSubscribers(t *testing.T) { ExpTime: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(), Filters: utils.ParseRSRFieldsMustCompile("^*default", utils.INFIELD_SEP)} sbscID := "testOnStorITCRUDSubscribers" - if err := onStor.DataDB().SetSubscriber(sbscID, sbsc); err != nil { + if err := onStor.SetSubscriber(sbscID, sbsc); err != nil { t.Error(err) } - if rcv, err := onStor.DataDB().GetSubscribers(); err != nil { + if rcv, err := onStor.GetSubscribers(); err != nil { t.Error(err) } else if !reflect.DeepEqual(sbsc.ExpTime, rcv[sbscID].ExpTime) { // Test just ExpTime since RSRField is more complex behind t.Errorf("Expecting: %v, received: %v", sbsc, rcv[sbscID]) } - if err := onStor.DataDB().RemoveSubscriber(sbscID); err != nil { + if err := onStor.RemoveSubscriber(sbscID); err != nil { t.Error(err) } - if sbs, err := onStor.DataDB().GetSubscribers(); err != nil { + if sbs, err := onStor.GetSubscribers(); err != nil { t.Error(err) } else if len(sbs) != 0 { t.Errorf("Received subscribers: %+v", sbs) diff --git a/engine/pubsub.go b/engine/pubsub.go index 503bc0586..9c4307f8a 100644 --- a/engine/pubsub.go +++ b/engine/pubsub.go @@ -69,7 +69,7 @@ func NewPubSub(dm *DataManager, ttlVerify bool) (*PubSub, error) { dm: dm, } // load subscribers - if subs, err := dm.DataDB().GetSubscribers(); err != nil { + if subs, err := dm.GetSubscribers(); err != nil { return nil, err } else { ps.subscribers = subs @@ -87,13 +87,13 @@ func (ps *PubSub) saveSubscriber(key string) { if !found { return } - if err := dm.DataDB().SetSubscriber(key, subData); err != nil { + if err := dm.SetSubscriber(key, subData); err != nil { utils.Logger.Err(" Error saving subscriber: " + err.Error()) } } func (ps *PubSub) removeSubscriber(key string) { - if err := dm.DataDB().RemoveSubscriber(key); err != nil { + if err := dm.RemoveSubscriber(key); err != nil { utils.Logger.Err(" Error removing subscriber: " + err.Error()) } } diff --git a/engine/pubsub_test.go b/engine/pubsub_test.go index 4a615e212..080e67745 100644 --- a/engine/pubsub_test.go +++ b/engine/pubsub_test.go @@ -57,7 +57,7 @@ func TestSubscribeSave(t *testing.T) { }, &r); err != nil { t.Error("Error subscribing: ", err) } - subs, err := dm.DataDB().GetSubscribers() + subs, err := dm.GetSubscribers() if err != nil || len(subs) != 1 { t.Error("Error saving subscribers: ", err, subs) } @@ -145,7 +145,7 @@ func TestUnsubscribeSave(t *testing.T) { }, &r); err != nil { t.Error("Error unsubscribing: ", err) } - subs, err := dm.DataDB().GetSubscribers() + subs, err := dm.GetSubscribers() if err != nil || len(subs) != 0 { t.Error("Error saving subscribers: ", err, subs) } @@ -193,14 +193,14 @@ func TestPublishExpiredSave(t *testing.T) { }, &r); err != nil { t.Error("Error subscribing: ", err) } - subs, err := dm.DataDB().GetSubscribers() + subs, err := dm.GetSubscribers() if err != nil || len(subs) != 1 { t.Error("Error saving subscribers: ", err, subs) } if err := ps.Publish(map[string]string{"EventFilter": "test"}, &r); err != nil { t.Error("Error publishing: ", err) } - subs, err = dm.DataDB().GetSubscribers() + subs, err = dm.GetSubscribers() if err != nil || len(subs) != 0 { t.Error("Error saving subscribers: ", err, subs) } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index a0805833e..991038f4e 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -87,9 +87,9 @@ type DataDB interface { RemoveAccount(string) error GetCdrStatsQueue(string) (*CDRStatsQueue, error) SetCdrStatsQueue(*CDRStatsQueue) error - GetSubscribers() (map[string]*SubscriberData, error) - SetSubscriber(string, *SubscriberData) error - RemoveSubscriber(string) error + GetSubscribersDrv() (map[string]*SubscriberData, error) + SetSubscriberDrv(string, *SubscriberData) error + RemoveSubscriberDrv(string) error SetUserDrv(*UserProfile) error GetUserDrv(string) (*UserProfile, error) GetUsersDrv() ([]*UserProfile, error) diff --git a/engine/storage_map.go b/engine/storage_map.go index 30b3adecd..81afac8ee 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -560,7 +560,7 @@ func (ms *MapStorage) SetCdrStatsQueue(sq *CDRStatsQueue) (err error) { return } -func (ms *MapStorage) GetSubscribers() (result map[string]*SubscriberData, err error) { +func (ms *MapStorage) GetSubscribersDrv() (result map[string]*SubscriberData, err error) { ms.mu.RLock() defer ms.mu.RUnlock() result = make(map[string]*SubscriberData) @@ -574,7 +574,7 @@ func (ms *MapStorage) GetSubscribers() (result map[string]*SubscriberData, err e } return } -func (ms *MapStorage) SetSubscriber(key string, sub *SubscriberData) (err error) { +func (ms *MapStorage) SetSubscriberDrv(key string, sub *SubscriberData) (err error) { ms.mu.Lock() defer ms.mu.Unlock() result, err := ms.ms.Marshal(sub) @@ -582,7 +582,7 @@ func (ms *MapStorage) SetSubscriber(key string, sub *SubscriberData) (err error) return } -func (ms *MapStorage) RemoveSubscriber(key string) (err error) { +func (ms *MapStorage) RemoveSubscriberDrv(key string) (err error) { ms.mu.Lock() defer ms.mu.Unlock() delete(ms.dict, utils.PUBSUB_SUBSCRIBERS_PREFIX+key) diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index d89af4a3a..a57aee6ed 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1057,7 +1057,7 @@ func (ms *MongoStorage) SetCdrStatsQueue(sq *CDRStatsQueue) (err error) { return } -func (ms *MongoStorage) GetSubscribers() (result map[string]*SubscriberData, err error) { +func (ms *MongoStorage) GetSubscribersDrv() (result map[string]*SubscriberData, err error) { session, col := ms.conn(colPbs) defer session.Close() iter := col.Find(nil).Iter() @@ -1073,7 +1073,7 @@ func (ms *MongoStorage) GetSubscribers() (result map[string]*SubscriberData, err return } -func (ms *MongoStorage) SetSubscriber(key string, sub *SubscriberData) (err error) { +func (ms *MongoStorage) SetSubscriberDrv(key string, sub *SubscriberData) (err error) { session, col := ms.conn(colPbs) defer session.Close() _, err = col.Upsert(bson.M{"key": key}, &struct { @@ -1083,7 +1083,7 @@ func (ms *MongoStorage) SetSubscriber(key string, sub *SubscriberData) (err erro return err } -func (ms *MongoStorage) RemoveSubscriber(key string) (err error) { +func (ms *MongoStorage) RemoveSubscriberDrv(key string) (err error) { session, col := ms.conn(colPbs) defer session.Close() return col.Remove(bson.M{"key": key}) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 12efa65d4..a5cf1b743 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -611,7 +611,7 @@ func (rs *RedisStorage) SetCdrStatsQueue(sq *CDRStatsQueue) (err error) { return rs.Cmd("SET", utils.CDR_STATS_QUEUE_PREFIX+sq.GetId(), result).Err } -func (rs *RedisStorage) GetSubscribers() (result map[string]*SubscriberData, err error) { +func (rs *RedisStorage) GetSubscribersDrv() (result map[string]*SubscriberData, err error) { keys, err := rs.Cmd("KEYS", utils.PUBSUB_SUBSCRIBERS_PREFIX+"*").List() if err != nil { return nil, err @@ -634,7 +634,7 @@ func (rs *RedisStorage) GetSubscribers() (result map[string]*SubscriberData, err return } -func (rs *RedisStorage) SetSubscriber(key string, sub *SubscriberData) (err error) { +func (rs *RedisStorage) SetSubscriberDrv(key string, sub *SubscriberData) (err error) { result, err := rs.ms.Marshal(sub) if err != nil { return err @@ -642,7 +642,7 @@ func (rs *RedisStorage) SetSubscriber(key string, sub *SubscriberData) (err erro return rs.Cmd("SET", utils.PUBSUB_SUBSCRIBERS_PREFIX+key, result).Err } -func (rs *RedisStorage) RemoveSubscriber(key string) (err error) { +func (rs *RedisStorage) RemoveSubscriberDrv(key string) (err error) { err = rs.Cmd("DEL", utils.PUBSUB_SUBSCRIBERS_PREFIX+key).Err return }