simplified pubsub event

This commit is contained in:
Radu Ioan Fericean
2015-07-27 22:12:43 +03:00
parent 4a61f02b80
commit 20ed3ea34f
6 changed files with 20 additions and 26 deletions

View File

@@ -86,6 +86,8 @@ func executeCommand(command string) {
param = param.(*console.StringWrapper).Item
case *console.StringSliceWrapper:
param = param.(*console.StringSliceWrapper).Items
case *console.StringMapWrapper:
param = param.(*console.StringMapWrapper).Items
}
//log.Printf("Param: %+v", param)

View File

@@ -89,4 +89,8 @@ type StringSliceWrapper struct {
Items []string
}
type StringMapWrapper struct {
Items map[string]string
}
type EmptyWrapper struct{}

View File

@@ -18,8 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package console
import "github.com/cgrates/cgrates/engine"
func init() {
c := &CmdPublish{
name: "publish",
@@ -32,7 +30,7 @@ func init() {
type CmdPublish struct {
name string
rpcMethod string
rpcParams *engine.PublishInfo
rpcParams *StringMapWrapper
*CommandExecuter
}
@@ -46,7 +44,7 @@ func (self *CmdPublish) RpcMethod() string {
func (self *CmdPublish) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &engine.PublishInfo{}
self.rpcParams = &StringMapWrapper{}
}
return self.rpcParams
}

View File

@@ -115,7 +115,7 @@ func SetUserService(us UserService) {
func Publish(event CgrEvent) {
if pubSubServer != nil {
var s string
pubSubServer.Publish(PublishInfo{Event: event}, &s)
pubSubServer.Publish(event, &s)
}
}

View File

@@ -28,14 +28,10 @@ func (ce CgrEvent) PassFilters(rsrFields utils.RSRFields) bool {
return true
}
type PublishInfo struct {
Event CgrEvent
}
type PublisherSubscriber interface {
Subscribe(SubscribeInfo, *string) error
Unsubscribe(SubscribeInfo, *string) error
Publish(PublishInfo, *string) error
Publish(CgrEvent, *string) error
ShowSubscribers(string, *map[string]*SubscriberData) error
}
@@ -123,16 +119,16 @@ func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error {
return nil
}
func (ps *PubSub) Publish(pi PublishInfo, reply *string) error {
func (ps *PubSub) Publish(evt CgrEvent, reply *string) error {
ps.mux.Lock()
defer ps.mux.Unlock()
for key, subData := range ps.subscribers {
if !subData.ExpTime.IsZero() && subData.ExpTime.Before(time.Now()) {
delete(ps.subscribers, key)
ps.removeSubscriber(key)
continue // subscription expired, do not send event
continue // subscription exevtred, do not send event
}
if subData.Filters == nil || !pi.Event.PassFilters(subData.Filters) {
if subData.Filters == nil || !evt.PassFilters(subData.Filters) {
continue // the event does not match the filters
}
split := utils.InfieldSplit(key)
@@ -148,10 +144,10 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error {
go func() {
delay := utils.Fib()
for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort
if _, err := ps.pubFunc(address, ps.ttlVerify, pi.Event); err == nil {
if _, err := ps.pubFunc(address, ps.ttlVerify, evt); err == nil {
break // Success, no need to reinterate
} else if i == 4 { // Last iteration, syslog the warning
Logger.Warning(fmt.Sprintf("<PubSub> Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), pi.Event["EventName"]))
Logger.Warning(fmt.Sprintf("<PubSub> Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), evt["EventName"]))
break
}
time.Sleep(delay())
@@ -186,8 +182,8 @@ func (ps *ProxyPubSub) Subscribe(si SubscribeInfo, reply *string) error {
func (ps *ProxyPubSub) Unsubscribe(si SubscribeInfo, reply *string) error {
return ps.Client.Call("PubSubV1.Unsubscribe", si, reply)
}
func (ps *ProxyPubSub) Publish(pi PublishInfo, reply *string) error {
return ps.Client.Call("PubSubV1.Publish", pi, reply)
func (ps *ProxyPubSub) Publish(evt CgrEvent, reply *string) error {
return ps.Client.Call("PubSubV1.Publish", evt, reply)
}
func (ps *ProxyPubSub) ShowSubscribers(in string, reply *map[string]*SubscriberData) error {

View File

@@ -133,9 +133,7 @@ func TestPublish(t *testing.T) {
}
m := make(map[string]string)
m["EventFilter"] = "test"
if err := ps.Publish(PublishInfo{
Event: m,
}, &r); err != nil {
if err := ps.Publish(m, &r); err != nil {
t.Error("Error publishing: ", err)
}
for i := 0; i < 1000; i++ { // wait for the theread to populate map
@@ -166,9 +164,7 @@ func TestPublishExpired(t *testing.T) {
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
if err := ps.Publish(PublishInfo{
Event: map[string]string{"EventFilter": "test"},
}, &r); err != nil {
if err := ps.Publish(map[string]string{"EventFilter": "test"}, &r); err != nil {
t.Error("Error publishing: ", err)
}
if len(ps.subscribers) != 0 {
@@ -196,9 +192,7 @@ func TestPublishExpiredSave(t *testing.T) {
if err != nil || len(subs) != 1 {
t.Error("Error saving subscribers: ", err, subs)
}
if err := ps.Publish(PublishInfo{
Event: map[string]string{"EventFilter": "test"},
}, &r); err != nil {
if err := ps.Publish(map[string]string{"EventFilter": "test"}, &r); err != nil {
t.Error("Error publishing: ", err)
}
subs, err = accountingStorage.GetSubscribers()