diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index ddb792d7c..03f7af68c 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -442,8 +442,13 @@ func startHistoryServer(internalHistorySChan chan rpcclient.RpcClientConnection, internalHistorySChan <- scribeServer } -func startPubSubServer(internalPubSubSChan chan rpcclient.RpcClientConnection, accountDb engine.AccountingStorage, server *utils.Server) { - pubSubServer := engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify) +func startPubSubServer(internalPubSubSChan chan rpcclient.RpcClientConnection, accountDb engine.AccountingStorage, server *utils.Server, exitChan chan bool) { + pubSubServer, err := engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) + exitChan <- true + return + } server.RpcRegisterName("PubSubV1", pubSubServer) internalPubSubSChan <- pubSubServer } @@ -749,7 +754,7 @@ func main() { // Start PubSubS service if cfg.PubSubServerEnabled { - go startPubSubServer(internalPubSubSChan, accountDb, server) + go startPubSubServer(internalPubSubSChan, accountDb, server, exitChan) } // Start Aliases service diff --git a/engine/pubsub.go b/engine/pubsub.go index 95776e3ae..07879fd5a 100644 --- a/engine/pubsub.go +++ b/engine/pubsub.go @@ -59,7 +59,7 @@ type PubSub struct { accountDb AccountingStorage } -func NewPubSub(accountDb AccountingStorage, ttlVerify bool) *PubSub { +func NewPubSub(accountDb AccountingStorage, ttlVerify bool) (*PubSub, error) { ps := &PubSub{ ttlVerify: ttlVerify, subscribers: make(map[string]*SubscriberData), @@ -68,10 +68,17 @@ func NewPubSub(accountDb AccountingStorage, ttlVerify bool) *PubSub { accountDb: accountDb, } // load subscribers - if subs, err := accountDb.GetSubscribers(); err == nil { + if subs, err := accountDb.GetSubscribers(); err != nil { + return nil, err + } else { ps.subscribers = subs } - return ps + for _, sData := range ps.subscribers { + if err := sData.Filters.ParseRules(); err != nil { // Parse rules into regexp objects + utils.Logger.Err(fmt.Sprintf(" Error <%s> when parsing rules out of subscriber data: %+v", err.Error(), sData)) + } + } + return ps, nil } func (ps *PubSub) saveSubscriber(key string) { diff --git a/engine/pubsub_test.go b/engine/pubsub_test.go index 6da22d91d..e0e2cc552 100644 --- a/engine/pubsub_test.go +++ b/engine/pubsub_test.go @@ -25,7 +25,10 @@ import ( ) func TestSubscribe(t *testing.T) { - ps := NewPubSub(accountingStorage, false) + ps, err := NewPubSub(accountingStorage, false) + if err != nil { + t.Error(err) + } var r string if err := ps.Subscribe(SubscribeInfo{ EventFilter: "EventName/test", @@ -41,7 +44,10 @@ func TestSubscribe(t *testing.T) { } func TestSubscribeSave(t *testing.T) { - ps := NewPubSub(accountingStorage, false) + ps, err := NewPubSub(accountingStorage, false) + if err != nil { + t.Error(err) + } var r string if err := ps.Subscribe(SubscribeInfo{ EventFilter: "EventName/test", @@ -58,7 +64,10 @@ func TestSubscribeSave(t *testing.T) { } func TestSubscribeNoTransport(t *testing.T) { - ps := NewPubSub(accountingStorage, false) + ps, err := NewPubSub(accountingStorage, false) + if err != nil { + t.Error(err) + } var r string if err := ps.Subscribe(SubscribeInfo{ EventFilter: "EventName/test", @@ -71,7 +80,10 @@ func TestSubscribeNoTransport(t *testing.T) { } func TestSubscribeNoExpire(t *testing.T) { - ps := NewPubSub(accountingStorage, false) + ps, err := NewPubSub(accountingStorage, false) + if err != nil { + t.Error(err) + } var r string if err := ps.Subscribe(SubscribeInfo{ EventFilter: "EventName/test", @@ -87,7 +99,10 @@ func TestSubscribeNoExpire(t *testing.T) { } func TestUnsubscribe(t *testing.T) { - ps := NewPubSub(accountingStorage, false) + ps, err := NewPubSub(accountingStorage, false) + if err != nil { + t.Error(err) + } var r string if err := ps.Subscribe(SubscribeInfo{ EventFilter: "EventName/test", @@ -110,7 +125,10 @@ func TestUnsubscribe(t *testing.T) { } func TestUnsubscribeSave(t *testing.T) { - ps := NewPubSub(accountingStorage, false) + ps, err := NewPubSub(accountingStorage, false) + if err != nil { + t.Error(err) + } var r string if err := ps.Subscribe(SubscribeInfo{ EventFilter: "EventName/test", @@ -134,7 +152,10 @@ func TestUnsubscribeSave(t *testing.T) { } func TestPublishExpired(t *testing.T) { - ps := NewPubSub(accountingStorage, true) + ps, err := NewPubSub(accountingStorage, true) + if err != nil { + t.Error(err) + } ps.pubFunc = func(url string, ttl bool, obj []byte) ([]byte, error) { return nil, nil } @@ -156,7 +177,10 @@ func TestPublishExpired(t *testing.T) { } func TestPublishExpiredSave(t *testing.T) { - ps := NewPubSub(accountingStorage, true) + ps, err := NewPubSub(accountingStorage, true) + if err != nil { + t.Error(err) + } ps.pubFunc = func(url string, ttl bool, obj []byte) ([]byte, error) { return nil, nil } diff --git a/utils/rsrfield.go b/utils/rsrfield.go index a31465508..6ab2c3f01 100644 --- a/utils/rsrfield.go +++ b/utils/rsrfield.go @@ -288,3 +288,12 @@ func (flds RSRFields) Id() string { } return flds[0].Id } + +func (flds RSRFields) ParseRules() (err error) { + for _, rsrFld := range flds { + if err = rsrFld.ParseRules(); err != nil { + break + } + } + return +}