mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-13 02:56:24 +05:00
PubSubS parsing RSR rules on start
This commit is contained in:
@@ -442,8 +442,13 @@ func startHistoryServer(internalHistorySChan chan rpcclient.RpcClientConnection,
|
||||
internalHistorySChan <- scribeServer
|
||||
}
|
||||
|
||||
func startPubSubServer(internalPubSubSChan chan rpcclient.RpcClientConnection, accountDb engine.AccountingStorage, server *utils.Server) {
|
||||
pubSubServer := engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify)
|
||||
func startPubSubServer(internalPubSubSChan chan rpcclient.RpcClientConnection, accountDb engine.AccountingStorage, server *utils.Server, exitChan chan bool) {
|
||||
pubSubServer, err := engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<PubSubS> Could not start, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
server.RpcRegisterName("PubSubV1", pubSubServer)
|
||||
internalPubSubSChan <- pubSubServer
|
||||
}
|
||||
@@ -749,7 +754,7 @@ func main() {
|
||||
|
||||
// Start PubSubS service
|
||||
if cfg.PubSubServerEnabled {
|
||||
go startPubSubServer(internalPubSubSChan, accountDb, server)
|
||||
go startPubSubServer(internalPubSubSChan, accountDb, server, exitChan)
|
||||
}
|
||||
|
||||
// Start Aliases service
|
||||
|
||||
@@ -59,7 +59,7 @@ type PubSub struct {
|
||||
accountDb AccountingStorage
|
||||
}
|
||||
|
||||
func NewPubSub(accountDb AccountingStorage, ttlVerify bool) *PubSub {
|
||||
func NewPubSub(accountDb AccountingStorage, ttlVerify bool) (*PubSub, error) {
|
||||
ps := &PubSub{
|
||||
ttlVerify: ttlVerify,
|
||||
subscribers: make(map[string]*SubscriberData),
|
||||
@@ -68,10 +68,17 @@ func NewPubSub(accountDb AccountingStorage, ttlVerify bool) *PubSub {
|
||||
accountDb: accountDb,
|
||||
}
|
||||
// load subscribers
|
||||
if subs, err := accountDb.GetSubscribers(); err == nil {
|
||||
if subs, err := accountDb.GetSubscribers(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
ps.subscribers = subs
|
||||
}
|
||||
return ps
|
||||
for _, sData := range ps.subscribers {
|
||||
if err := sData.Filters.ParseRules(); err != nil { // Parse rules into regexp objects
|
||||
utils.Logger.Err(fmt.Sprintf("<PubSub> Error <%s> when parsing rules out of subscriber data: %+v", err.Error(), sData))
|
||||
}
|
||||
}
|
||||
return ps, nil
|
||||
}
|
||||
|
||||
func (ps *PubSub) saveSubscriber(key string) {
|
||||
|
||||
@@ -25,7 +25,10 @@ import (
|
||||
)
|
||||
|
||||
func TestSubscribe(t *testing.T) {
|
||||
ps := NewPubSub(accountingStorage, false)
|
||||
ps, err := NewPubSub(accountingStorage, false)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventFilter: "EventName/test",
|
||||
@@ -41,7 +44,10 @@ func TestSubscribe(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSubscribeSave(t *testing.T) {
|
||||
ps := NewPubSub(accountingStorage, false)
|
||||
ps, err := NewPubSub(accountingStorage, false)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventFilter: "EventName/test",
|
||||
@@ -58,7 +64,10 @@ func TestSubscribeSave(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSubscribeNoTransport(t *testing.T) {
|
||||
ps := NewPubSub(accountingStorage, false)
|
||||
ps, err := NewPubSub(accountingStorage, false)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventFilter: "EventName/test",
|
||||
@@ -71,7 +80,10 @@ func TestSubscribeNoTransport(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSubscribeNoExpire(t *testing.T) {
|
||||
ps := NewPubSub(accountingStorage, false)
|
||||
ps, err := NewPubSub(accountingStorage, false)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventFilter: "EventName/test",
|
||||
@@ -87,7 +99,10 @@ func TestSubscribeNoExpire(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUnsubscribe(t *testing.T) {
|
||||
ps := NewPubSub(accountingStorage, false)
|
||||
ps, err := NewPubSub(accountingStorage, false)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventFilter: "EventName/test",
|
||||
@@ -110,7 +125,10 @@ func TestUnsubscribe(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUnsubscribeSave(t *testing.T) {
|
||||
ps := NewPubSub(accountingStorage, false)
|
||||
ps, err := NewPubSub(accountingStorage, false)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventFilter: "EventName/test",
|
||||
@@ -134,7 +152,10 @@ func TestUnsubscribeSave(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPublishExpired(t *testing.T) {
|
||||
ps := NewPubSub(accountingStorage, true)
|
||||
ps, err := NewPubSub(accountingStorage, true)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
ps.pubFunc = func(url string, ttl bool, obj []byte) ([]byte, error) {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -156,7 +177,10 @@ func TestPublishExpired(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPublishExpiredSave(t *testing.T) {
|
||||
ps := NewPubSub(accountingStorage, true)
|
||||
ps, err := NewPubSub(accountingStorage, true)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
ps.pubFunc = func(url string, ttl bool, obj []byte) ([]byte, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@@ -288,3 +288,12 @@ func (flds RSRFields) Id() string {
|
||||
}
|
||||
return flds[0].Id
|
||||
}
|
||||
|
||||
func (flds RSRFields) ParseRules() (err error) {
|
||||
for _, rsrFld := range flds {
|
||||
if err = rsrFld.ParseRules(); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user