mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
refactoring and saving subscribers
This commit is contained in:
@@ -35,7 +35,6 @@ import (
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/history"
|
||||
"github.com/cgrates/cgrates/pubsub"
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
"github.com/cgrates/cgrates/sessionmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -68,7 +67,7 @@ var (
|
||||
exitChan = make(chan bool)
|
||||
server = &engine.Server{}
|
||||
scribeServer history.Scribe
|
||||
pubSubServer pubsub.PublisherSubscriber
|
||||
pubSubServer engine.PublisherSubscriber
|
||||
cdrServer *engine.CdrServer
|
||||
cdrStats *engine.Stats
|
||||
cfg *config.CGRConfig
|
||||
@@ -338,7 +337,7 @@ func startHistoryServer(chanDone chan struct{}) {
|
||||
// chanStartServer will report when server is up, useful for internal requests
|
||||
func startHistoryAgent(chanServerStarted chan struct{}) {
|
||||
if cfg.HistoryServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting
|
||||
engine.Logger.Crit(fmt.Sprintf("<HistoryAgent> Connecting internally to HistoryServer"))
|
||||
//engine.Logger.Crit(fmt.Sprintf("<HistoryAgent> Connecting internally to HistoryServer"))
|
||||
select {
|
||||
case <-time.After(1 * time.Minute):
|
||||
engine.Logger.Crit(fmt.Sprintf("<HistoryAgent> Timeout waiting for server to start."))
|
||||
@@ -365,8 +364,8 @@ func startHistoryAgent(chanServerStarted chan struct{}) {
|
||||
return
|
||||
}
|
||||
|
||||
func startPubSubServer(chanDone chan struct{}) {
|
||||
if pubSubServer = pubsub.NewPubSub(cfg.HttpSkipTlsVerify); err != nil {
|
||||
func startPubSubServer(chanDone chan struct{}, accountDb engine.AccountingStorage) {
|
||||
if pubSubServer = engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify); err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<PubSubServer> Could not start, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
@@ -376,9 +375,8 @@ func startPubSubServer(chanDone chan struct{}) {
|
||||
}
|
||||
|
||||
// chanStartServer will report when server is up, useful for internal requests
|
||||
func startPubSubAgent(chanServerStarted chan struct{}) {
|
||||
func startPubSubAgent(chanServerStarted chan struct{}, accountDb engine.AccountingStorage) {
|
||||
if cfg.PubSubServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting
|
||||
engine.Logger.Crit(fmt.Sprintf("<PubSubAgent> Connecting internally to PubSubServer"))
|
||||
select {
|
||||
case <-time.After(1 * time.Minute):
|
||||
engine.Logger.Crit(fmt.Sprintf("<PubSubAgent> Timeout waiting for server to start."))
|
||||
@@ -391,7 +389,7 @@ func startPubSubAgent(chanServerStarted chan struct{}) {
|
||||
delay := utils.Fib()
|
||||
for i := 0; i < 3; i++ { //ToDo: Make it globally configurable
|
||||
//engine.Logger.Crit(fmt.Sprintf("<PubSubAgent> Trying to connect, iteration: %d, time %s", i, time.Now()))
|
||||
if pubSubServer = pubsub.NewPubSub(cfg.HttpSkipTlsVerify); err == nil {
|
||||
if pubSubServer = engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify); err == nil {
|
||||
break //Connected so no need to reiterate
|
||||
} else if i == 2 && err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<PubSubAgent> Could not connect to the server, error: %s", err.Error()))
|
||||
@@ -625,12 +623,12 @@ func main() {
|
||||
if cfg.PubSubServerEnabled {
|
||||
pubsubServChan = make(chan struct{})
|
||||
rpcWait = append(rpcWait, pubsubServChan)
|
||||
go startPubSubServer(pubsubServChan)
|
||||
go startPubSubServer(pubsubServChan, accountDb)
|
||||
}
|
||||
|
||||
if cfg.PubSubAgentEnabled {
|
||||
engine.Logger.Info("Starting CGRateS PubSub Agent.")
|
||||
go startPubSubAgent(pubsubServChan)
|
||||
go startPubSubAgent(pubsubServChan, accountDb)
|
||||
}
|
||||
|
||||
var cdrsChan chan struct{}
|
||||
|
||||
@@ -49,7 +49,9 @@ func generalSignalHandler() {
|
||||
sig := <-c
|
||||
engine.Logger.Info(fmt.Sprintf("Caught signal %v, shuting down cgr-engine\n", sig))
|
||||
var dummyInt int
|
||||
cdrStats.Stop(dummyInt, &dummyInt)
|
||||
if cdrStats != nil {
|
||||
cdrStats.Stop(dummyInt, &dummyInt)
|
||||
}
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
|
||||
@@ -29,7 +29,6 @@ import (
|
||||
|
||||
"github.com/cgrates/cgrates/cache2go"
|
||||
"github.com/cgrates/cgrates/history"
|
||||
"github.com/cgrates/cgrates/pubsub"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
@@ -68,7 +67,7 @@ var (
|
||||
debitPeriod = 10 * time.Second
|
||||
globalRoundingDecimals = 10
|
||||
historyScribe history.Scribe
|
||||
pubSubServer pubsub.PublisherSubscriber
|
||||
pubSubServer PublisherSubscriber
|
||||
//historyScribe, _ = history.NewMockScribe()
|
||||
)
|
||||
|
||||
@@ -105,7 +104,7 @@ func SetHistoryScribe(scribe history.Scribe) {
|
||||
historyScribe = scribe
|
||||
}
|
||||
|
||||
func SetPubSub(ps pubsub.PublisherSubscriber) {
|
||||
func SetPubSub(ps PublisherSubscriber) {
|
||||
pubSubServer = ps
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
package pubsub
|
||||
package engine
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -32,14 +33,38 @@ type PubSub struct {
|
||||
ttlVerify bool
|
||||
pubFunc func(string, bool, interface{}) ([]byte, error)
|
||||
mux *sync.Mutex
|
||||
accountDb AccountingStorage
|
||||
}
|
||||
|
||||
func NewPubSub(ttlVerify bool) *PubSub {
|
||||
return &PubSub{
|
||||
func NewPubSub(accountDb AccountingStorage, ttlVerify bool) *PubSub {
|
||||
ps := &PubSub{
|
||||
ttlVerify: ttlVerify,
|
||||
subscribers: make(map[string]map[string]time.Time),
|
||||
pubFunc: utils.HttpJsonPost,
|
||||
mux: &sync.Mutex{},
|
||||
accountDb: accountDb,
|
||||
}
|
||||
// load subscribers
|
||||
if subs, err := accountDb.GetPubSubSubscribers(); err == nil {
|
||||
ps.subscribers = subs
|
||||
}
|
||||
return ps
|
||||
}
|
||||
|
||||
func (ps *PubSub) saveSubscribers(key string) {
|
||||
if key != "" {
|
||||
if _, found := ps.subscribers[key]; !found {
|
||||
return
|
||||
}
|
||||
if err := accountingStorage.SetPubSubSubscribers(key, ps.subscribers[key]); err != nil {
|
||||
Logger.Err("<PubSub> Error saving subscribers: " + err.Error())
|
||||
}
|
||||
} else { // save all
|
||||
for key, valueMap := range ps.subscribers {
|
||||
if err := accountingStorage.SetPubSubSubscribers(key, valueMap); err != nil {
|
||||
Logger.Err("<PubSub> Error saving subscribers: " + err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,6 +83,7 @@ func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error {
|
||||
expTime = time.Now().Add(si.LifeSpan)
|
||||
}
|
||||
ps.subscribers[si.EventName][utils.InfieldJoin(si.Transport, si.Address)] = expTime
|
||||
ps.saveSubscribers(si.EventName)
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
@@ -70,6 +96,7 @@ func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error {
|
||||
return errors.New(*reply)
|
||||
}
|
||||
delete(ps.subscribers[si.EventName], utils.InfieldJoin(si.Transport, si.Address))
|
||||
ps.saveSubscribers(si.EventName)
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
@@ -78,15 +105,17 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error {
|
||||
ps.mux.Lock()
|
||||
defer ps.mux.Unlock()
|
||||
subs := ps.subscribers[pi.Event["EventName"]]
|
||||
for transport_address, expTime := range subs {
|
||||
split := utils.InfieldSplit(transport_address)
|
||||
for transportAddress, expTime := range subs {
|
||||
split := utils.InfieldSplit(transportAddress)
|
||||
if len(split) != 2 {
|
||||
Logger.Warning("<PubSub> Wrong transport;address pair: " + transportAddress)
|
||||
continue
|
||||
}
|
||||
transport := split[0]
|
||||
address := split[1]
|
||||
if !expTime.IsZero() && expTime.Before(time.Now()) {
|
||||
delete(subs, transport_address)
|
||||
delete(subs, transportAddress)
|
||||
ps.saveSubscribers(pi.Event["EventName"])
|
||||
continue // subscription expired, do not send event
|
||||
}
|
||||
switch transport {
|
||||
@@ -96,6 +125,9 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error {
|
||||
for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort
|
||||
if _, err := ps.pubFunc(address, ps.ttlVerify, pi.Event); err == nil {
|
||||
break // Success, no need to reinterate
|
||||
} else if i == 4 { // Last iteration, syslog the warning
|
||||
Logger.Warning(fmt.Sprintf("<PubSub> Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), pi.Event["EventName"]))
|
||||
break
|
||||
}
|
||||
time.Sleep(delay())
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package pubsub
|
||||
package engine
|
||||
|
||||
import (
|
||||
"testing"
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
)
|
||||
|
||||
func TestSubscribe(t *testing.T) {
|
||||
ps := NewPubSub(false)
|
||||
ps := NewPubSub(accountingStorage, false)
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventName: "test",
|
||||
@@ -24,7 +24,7 @@ func TestSubscribe(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSubscribeNoTransport(t *testing.T) {
|
||||
ps := NewPubSub(false)
|
||||
ps := NewPubSub(accountingStorage, false)
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventName: "test",
|
||||
@@ -37,7 +37,7 @@ func TestSubscribeNoTransport(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSubscribeNoExpire(t *testing.T) {
|
||||
ps := NewPubSub(false)
|
||||
ps := NewPubSub(accountingStorage, false)
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventName: "test",
|
||||
@@ -53,7 +53,7 @@ func TestSubscribeNoExpire(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUnsubscribe(t *testing.T) {
|
||||
ps := NewPubSub(false)
|
||||
ps := NewPubSub(accountingStorage, false)
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventName: "test",
|
||||
@@ -76,7 +76,7 @@ func TestUnsubscribe(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPublish(t *testing.T) {
|
||||
ps := NewPubSub(true)
|
||||
ps := NewPubSub(accountingStorage, true)
|
||||
ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) {
|
||||
obj.(map[string]string)["called"] = url
|
||||
return nil, nil
|
||||
@@ -110,7 +110,7 @@ func TestPublish(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPublishExpired(t *testing.T) {
|
||||
ps := NewPubSub(true)
|
||||
ps := NewPubSub(accountingStorage, true)
|
||||
ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) {
|
||||
m := obj.(map[string]string)
|
||||
m["called"] = "yes"
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"encoding/gob"
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/ugorji/go/codec"
|
||||
@@ -79,6 +80,8 @@ type AccountingStorage interface {
|
||||
SetAccount(*Account) error
|
||||
GetCdrStatsQueue(string) (*StatsQueue, error)
|
||||
SetCdrStatsQueue(*StatsQueue) error
|
||||
GetPubSubSubscribers() (map[string]map[string]time.Time, error)
|
||||
SetPubSubSubscribers(string, map[string]time.Time) error
|
||||
}
|
||||
|
||||
type CdrStorage interface {
|
||||
|
||||
@@ -561,6 +561,24 @@ func (ms *MapStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetPubSubSubscribers() (result map[string]map[string]time.Time, err error) {
|
||||
result = make(map[string]map[string]time.Time)
|
||||
for key, value := range ms.dict {
|
||||
if strings.HasPrefix(key, utils.PUBSUB_SUBSCRIBERS_PREFIX) {
|
||||
subs := make(map[string]time.Time)
|
||||
if err = ms.ms.Unmarshal(value, subs); err == nil {
|
||||
result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
func (ms *MapStorage) SetPubSubSubscribers(key string, subs map[string]time.Time) (err error) {
|
||||
result, err := ms.ms.Marshal(subs)
|
||||
ms.dict[utils.CDR_STATS_QUEUE_PREFIX+key] = result
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetActionPlans(key string) (ats ActionPlans, err error) {
|
||||
if values, ok := ms.dict[utils.ACTION_TIMING_PREFIX+key]; ok {
|
||||
err = ms.ms.Unmarshal(values, &ats)
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"compress/zlib"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/cgrates/cgrates/cache2go"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -689,6 +690,32 @@ func (rs *RedisStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]time.Time, err error) {
|
||||
keys, err := rs.db.Keys(utils.PUBSUB_SUBSCRIBERS_PREFIX + "*")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = make(map[string]map[string]time.Time)
|
||||
for _, key := range keys {
|
||||
log.Print("KEY: ", key)
|
||||
if values, err := rs.db.Get(key); err == nil {
|
||||
subs := make(map[string]time.Time)
|
||||
err = rs.ms.Unmarshal(values, subs)
|
||||
result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs
|
||||
} else {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
}
|
||||
log.Print("XXX: ", result)
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetPubSubSubscribers(key string, subs map[string]time.Time) (err error) {
|
||||
result, err := rs.ms.Marshal(subs)
|
||||
rs.db.Set(utils.PUBSUB_SUBSCRIBERS_PREFIX+key, result)
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetActionPlans(key string) (ats ActionPlans, err error) {
|
||||
var values []byte
|
||||
if values, err = rs.db.Get(utils.ACTION_TIMING_PREFIX + key); err == nil {
|
||||
|
||||
@@ -165,6 +165,7 @@ const (
|
||||
LCR_PREFIX = "lcr_"
|
||||
DERIVEDCHARGERS_PREFIX = "dcs_"
|
||||
CDR_STATS_QUEUE_PREFIX = "csq_"
|
||||
PUBSUB_SUBSCRIBERS_PREFIX = "pss_"
|
||||
CDR_STATS_PREFIX = "cst_"
|
||||
TEMP_DESTINATION_PREFIX = "tmp_"
|
||||
LOG_CALL_COST_PREFIX = "cco_"
|
||||
|
||||
Reference in New Issue
Block a user