Files
cgrates/engine/pubsub.go
2015-11-26 23:02:03 +02:00

189 lines
4.5 KiB
Go

package engine
import (
"errors"
"fmt"
"reflect"
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/utils"
)
type SubscribeInfo struct {
EventFilter string
Transport string
Address string
LifeSpan time.Duration
}
type CgrEvent map[string]string
func (ce CgrEvent) PassFilters(rsrFields utils.RSRFields) bool {
for _, rsrFld := range rsrFields {
if !rsrFld.FilterPasses(ce[rsrFld.Id]) {
return false
}
}
return true
}
type SubscriberData struct {
ExpTime time.Time
Filters utils.RSRFields
}
type PubSub struct {
subscribers map[string]*SubscriberData
ttlVerify bool
pubFunc func(string, bool, interface{}) ([]byte, error)
mux *sync.Mutex
accountDb AccountingStorage
}
func NewPubSub(accountDb AccountingStorage, ttlVerify bool) *PubSub {
ps := &PubSub{
ttlVerify: ttlVerify,
subscribers: make(map[string]*SubscriberData),
pubFunc: utils.HttpJsonPost,
mux: &sync.Mutex{},
accountDb: accountDb,
}
// load subscribers
if subs, err := accountDb.GetSubscribers(); err == nil {
ps.subscribers = subs
}
return ps
}
func (ps *PubSub) saveSubscriber(key string) {
subData, found := ps.subscribers[key]
if !found {
return
}
if err := accountingStorage.SetSubscriber(key, subData); err != nil {
utils.Logger.Err("<PubSub> Error saving subscriber: " + err.Error())
}
}
func (ps *PubSub) removeSubscriber(key string) {
if err := accountingStorage.RemoveSubscriber(key); err != nil {
utils.Logger.Err("<PubSub> Error removing subscriber: " + err.Error())
}
}
func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error {
ps.mux.Lock()
defer ps.mux.Unlock()
if si.Transport != utils.META_HTTP_POST {
*reply = "Unsupported transport type"
return errors.New(*reply)
}
var expTime time.Time
if si.LifeSpan > 0 {
expTime = time.Now().Add(si.LifeSpan)
}
rsr, err := utils.ParseRSRFields(si.EventFilter, utils.INFIELD_SEP)
if err != nil {
*reply = err.Error()
return err
}
key := utils.InfieldJoin(si.Transport, si.Address)
ps.subscribers[key] = &SubscriberData{
ExpTime: expTime,
Filters: rsr,
}
ps.saveSubscriber(key)
*reply = utils.OK
return nil
}
func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error {
ps.mux.Lock()
defer ps.mux.Unlock()
if si.Transport != utils.META_HTTP_POST {
*reply = "Unsupported transport type"
return errors.New(*reply)
}
key := utils.InfieldJoin(si.Transport, si.Address)
delete(ps.subscribers, key)
ps.removeSubscriber(key)
*reply = utils.OK
return nil
}
func (ps *PubSub) Publish(evt CgrEvent, reply *string) error {
ps.mux.Lock()
defer ps.mux.Unlock()
evt["Timestamp"] = time.Now().Format(time.RFC3339Nano)
for key, subData := range ps.subscribers {
if !subData.ExpTime.IsZero() && subData.ExpTime.Before(time.Now()) {
delete(ps.subscribers, key)
ps.removeSubscriber(key)
continue // subscription exevtred, do not send event
}
if subData.Filters == nil || !evt.PassFilters(subData.Filters) {
continue // the event does not match the filters
}
split := utils.InfieldSplit(key)
if len(split) != 2 {
utils.Logger.Warning("<PubSub> Wrong transport;address pair: " + key)
continue
}
transport := split[0]
address := split[1]
switch transport {
case utils.META_HTTP_POST:
go func() {
delay := utils.Fib()
for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort
if _, err := ps.pubFunc(address, ps.ttlVerify, evt); err == nil {
break // Success, no need to reinterate
} else if i == 4 { // Last iteration, syslog the warning
utils.Logger.Warning(fmt.Sprintf("<PubSub> Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), evt["EventName"]))
break
}
time.Sleep(delay())
}
}()
}
}
*reply = utils.OK
return nil
}
func (ps *PubSub) ShowSubscribers(in string, out *map[string]*SubscriberData) error {
*out = ps.subscribers
return nil
}
func (ps *PubSub) Call(serviceMethod string, args interface{}, reply interface{}) error {
parts := strings.Split(serviceMethod, ".")
if len(parts) != 2 {
return utils.ErrNotImplemented
}
// get method
method := reflect.ValueOf(ps).MethodByName(parts[1])
if !method.IsValid() {
return utils.ErrNotImplemented
}
// construct the params
params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
ret := method.Call(params)
if len(ret) != 1 {
return utils.ErrServerError
}
if ret[0].Interface() == nil {
return nil
}
err, ok := ret[0].Interface().(error)
if !ok {
return utils.ErrServerError
}
return err
}