diff --git a/cmd/cgr-console/cgr-console.go b/cmd/cgr-console/cgr-console.go index 6301ce4e2..3aa480d1f 100644 --- a/cmd/cgr-console/cgr-console.go +++ b/cmd/cgr-console/cgr-console.go @@ -86,6 +86,8 @@ func executeCommand(command string) { param = param.(*console.StringWrapper).Item case *console.StringSliceWrapper: param = param.(*console.StringSliceWrapper).Items + case *console.StringMapWrapper: + param = param.(*console.StringMapWrapper).Items } //log.Printf("Param: %+v", param) diff --git a/console/command.go b/console/command.go index feb801808..65ded1b0e 100644 --- a/console/command.go +++ b/console/command.go @@ -89,4 +89,8 @@ type StringSliceWrapper struct { Items []string } +type StringMapWrapper struct { + Items map[string]string +} + type EmptyWrapper struct{} diff --git a/console/publish.go b/console/publish.go index c46c68a82..78de7679c 100644 --- a/console/publish.go +++ b/console/publish.go @@ -18,8 +18,6 @@ along with this program. If not, see package console -import "github.com/cgrates/cgrates/engine" - func init() { c := &CmdPublish{ name: "publish", @@ -32,7 +30,7 @@ func init() { type CmdPublish struct { name string rpcMethod string - rpcParams *engine.PublishInfo + rpcParams *StringMapWrapper *CommandExecuter } @@ -46,7 +44,7 @@ func (self *CmdPublish) RpcMethod() string { func (self *CmdPublish) RpcParams(reset bool) interface{} { if reset || self.rpcParams == nil { - self.rpcParams = &engine.PublishInfo{} + self.rpcParams = &StringMapWrapper{} } return self.rpcParams } diff --git a/engine/calldesc.go b/engine/calldesc.go index 8024184b2..eb83d748b 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -115,7 +115,7 @@ func SetUserService(us UserService) { func Publish(event CgrEvent) { if pubSubServer != nil { var s string - pubSubServer.Publish(PublishInfo{Event: event}, &s) + pubSubServer.Publish(event, &s) } } diff --git a/engine/pubsub.go b/engine/pubsub.go index d8c372fd1..76a6bd79b 100644 --- a/engine/pubsub.go +++ b/engine/pubsub.go @@ -28,14 +28,10 @@ func (ce CgrEvent) PassFilters(rsrFields utils.RSRFields) bool { return true } -type PublishInfo struct { - Event CgrEvent -} - type PublisherSubscriber interface { Subscribe(SubscribeInfo, *string) error Unsubscribe(SubscribeInfo, *string) error - Publish(PublishInfo, *string) error + Publish(CgrEvent, *string) error ShowSubscribers(string, *map[string]*SubscriberData) error } @@ -123,16 +119,16 @@ func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error { return nil } -func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { +func (ps *PubSub) Publish(evt CgrEvent, reply *string) error { ps.mux.Lock() defer ps.mux.Unlock() for key, subData := range ps.subscribers { if !subData.ExpTime.IsZero() && subData.ExpTime.Before(time.Now()) { delete(ps.subscribers, key) ps.removeSubscriber(key) - continue // subscription expired, do not send event + continue // subscription exevtred, do not send event } - if subData.Filters == nil || !pi.Event.PassFilters(subData.Filters) { + if subData.Filters == nil || !evt.PassFilters(subData.Filters) { continue // the event does not match the filters } split := utils.InfieldSplit(key) @@ -148,10 +144,10 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { go func() { delay := utils.Fib() for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort - if _, err := ps.pubFunc(address, ps.ttlVerify, pi.Event); err == nil { + if _, err := ps.pubFunc(address, ps.ttlVerify, evt); err == nil { break // Success, no need to reinterate } else if i == 4 { // Last iteration, syslog the warning - Logger.Warning(fmt.Sprintf(" Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), pi.Event["EventName"])) + Logger.Warning(fmt.Sprintf(" Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), evt["EventName"])) break } time.Sleep(delay()) @@ -186,8 +182,8 @@ func (ps *ProxyPubSub) Subscribe(si SubscribeInfo, reply *string) error { func (ps *ProxyPubSub) Unsubscribe(si SubscribeInfo, reply *string) error { return ps.Client.Call("PubSubV1.Unsubscribe", si, reply) } -func (ps *ProxyPubSub) Publish(pi PublishInfo, reply *string) error { - return ps.Client.Call("PubSubV1.Publish", pi, reply) +func (ps *ProxyPubSub) Publish(evt CgrEvent, reply *string) error { + return ps.Client.Call("PubSubV1.Publish", evt, reply) } func (ps *ProxyPubSub) ShowSubscribers(in string, reply *map[string]*SubscriberData) error { diff --git a/engine/pubsub_test.go b/engine/pubsub_test.go index d506ec291..470553b3d 100644 --- a/engine/pubsub_test.go +++ b/engine/pubsub_test.go @@ -133,9 +133,7 @@ func TestPublish(t *testing.T) { } m := make(map[string]string) m["EventFilter"] = "test" - if err := ps.Publish(PublishInfo{ - Event: m, - }, &r); err != nil { + if err := ps.Publish(m, &r); err != nil { t.Error("Error publishing: ", err) } for i := 0; i < 1000; i++ { // wait for the theread to populate map @@ -166,9 +164,7 @@ func TestPublishExpired(t *testing.T) { }, &r); err != nil { t.Error("Error subscribing: ", err) } - if err := ps.Publish(PublishInfo{ - Event: map[string]string{"EventFilter": "test"}, - }, &r); err != nil { + if err := ps.Publish(map[string]string{"EventFilter": "test"}, &r); err != nil { t.Error("Error publishing: ", err) } if len(ps.subscribers) != 0 { @@ -196,9 +192,7 @@ func TestPublishExpiredSave(t *testing.T) { if err != nil || len(subs) != 1 { t.Error("Error saving subscribers: ", err, subs) } - if err := ps.Publish(PublishInfo{ - Event: map[string]string{"EventFilter": "test"}, - }, &r); err != nil { + if err := ps.Publish(map[string]string{"EventFilter": "test"}, &r); err != nil { t.Error("Error publishing: ", err) } subs, err = accountingStorage.GetSubscribers()