diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index c46775254..693fdf5b5 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -35,7 +35,6 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/history" - "github.com/cgrates/cgrates/pubsub" "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/sessionmanager" "github.com/cgrates/cgrates/utils" @@ -68,7 +67,7 @@ var ( exitChan = make(chan bool) server = &engine.Server{} scribeServer history.Scribe - pubSubServer pubsub.PublisherSubscriber + pubSubServer engine.PublisherSubscriber cdrServer *engine.CdrServer cdrStats *engine.Stats cfg *config.CGRConfig @@ -338,7 +337,7 @@ func startHistoryServer(chanDone chan struct{}) { // chanStartServer will report when server is up, useful for internal requests func startHistoryAgent(chanServerStarted chan struct{}) { if cfg.HistoryServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting - engine.Logger.Crit(fmt.Sprintf(" Connecting internally to HistoryServer")) + //engine.Logger.Crit(fmt.Sprintf(" Connecting internally to HistoryServer")) select { case <-time.After(1 * time.Minute): engine.Logger.Crit(fmt.Sprintf(" Timeout waiting for server to start.")) @@ -365,8 +364,8 @@ func startHistoryAgent(chanServerStarted chan struct{}) { return } -func startPubSubServer(chanDone chan struct{}) { - if pubSubServer = pubsub.NewPubSub(cfg.HttpSkipTlsVerify); err != nil { +func startPubSubServer(chanDone chan struct{}, accountDb engine.AccountingStorage) { + if pubSubServer = engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify); err != nil { engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) exitChan <- true return @@ -376,9 +375,8 @@ func startPubSubServer(chanDone chan struct{}) { } // chanStartServer will report when server is up, useful for internal requests -func startPubSubAgent(chanServerStarted chan struct{}) { +func startPubSubAgent(chanServerStarted chan struct{}, accountDb engine.AccountingStorage) { if cfg.PubSubServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting - engine.Logger.Crit(fmt.Sprintf(" Connecting internally to PubSubServer")) select { case <-time.After(1 * time.Minute): engine.Logger.Crit(fmt.Sprintf(" Timeout waiting for server to start.")) @@ -391,7 +389,7 @@ func startPubSubAgent(chanServerStarted chan struct{}) { delay := utils.Fib() for i := 0; i < 3; i++ { //ToDo: Make it globally configurable //engine.Logger.Crit(fmt.Sprintf(" Trying to connect, iteration: %d, time %s", i, time.Now())) - if pubSubServer = pubsub.NewPubSub(cfg.HttpSkipTlsVerify); err == nil { + if pubSubServer = engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify); err == nil { break //Connected so no need to reiterate } else if i == 2 && err != nil { engine.Logger.Crit(fmt.Sprintf(" Could not connect to the server, error: %s", err.Error())) @@ -625,12 +623,12 @@ func main() { if cfg.PubSubServerEnabled { pubsubServChan = make(chan struct{}) rpcWait = append(rpcWait, pubsubServChan) - go startPubSubServer(pubsubServChan) + go startPubSubServer(pubsubServChan, accountDb) } if cfg.PubSubAgentEnabled { engine.Logger.Info("Starting CGRateS PubSub Agent.") - go startPubSubAgent(pubsubServChan) + go startPubSubAgent(pubsubServChan, accountDb) } var cdrsChan chan struct{} diff --git a/cmd/cgr-engine/registration.go b/cmd/cgr-engine/registration.go index a929eaada..97837b68b 100644 --- a/cmd/cgr-engine/registration.go +++ b/cmd/cgr-engine/registration.go @@ -49,7 +49,9 @@ func generalSignalHandler() { sig := <-c engine.Logger.Info(fmt.Sprintf("Caught signal %v, shuting down cgr-engine\n", sig)) var dummyInt int - cdrStats.Stop(dummyInt, &dummyInt) + if cdrStats != nil { + cdrStats.Stop(dummyInt, &dummyInt) + } exitChan <- true } diff --git a/engine/calldesc.go b/engine/calldesc.go index 65ecfdd58..2e45d3c54 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -29,7 +29,6 @@ import ( "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/history" - "github.com/cgrates/cgrates/pubsub" "github.com/cgrates/cgrates/utils" ) @@ -68,7 +67,7 @@ var ( debitPeriod = 10 * time.Second globalRoundingDecimals = 10 historyScribe history.Scribe - pubSubServer pubsub.PublisherSubscriber + pubSubServer PublisherSubscriber //historyScribe, _ = history.NewMockScribe() ) @@ -105,7 +104,7 @@ func SetHistoryScribe(scribe history.Scribe) { historyScribe = scribe } -func SetPubSub(ps pubsub.PublisherSubscriber) { +func SetPubSub(ps PublisherSubscriber) { pubSubServer = ps } diff --git a/pubsub/pubsub.go b/engine/pubsub.go similarity index 69% rename from pubsub/pubsub.go rename to engine/pubsub.go index 28f890517..8bc8ecee8 100644 --- a/pubsub/pubsub.go +++ b/engine/pubsub.go @@ -1,7 +1,8 @@ -package pubsub +package engine import ( "errors" + "fmt" "sync" "time" @@ -32,14 +33,38 @@ type PubSub struct { ttlVerify bool pubFunc func(string, bool, interface{}) ([]byte, error) mux *sync.Mutex + accountDb AccountingStorage } -func NewPubSub(ttlVerify bool) *PubSub { - return &PubSub{ +func NewPubSub(accountDb AccountingStorage, ttlVerify bool) *PubSub { + ps := &PubSub{ ttlVerify: ttlVerify, subscribers: make(map[string]map[string]time.Time), pubFunc: utils.HttpJsonPost, mux: &sync.Mutex{}, + accountDb: accountDb, + } + // load subscribers + if subs, err := accountDb.GetPubSubSubscribers(); err == nil { + ps.subscribers = subs + } + return ps +} + +func (ps *PubSub) saveSubscribers(key string) { + if key != "" { + if _, found := ps.subscribers[key]; !found { + return + } + if err := accountingStorage.SetPubSubSubscribers(key, ps.subscribers[key]); err != nil { + Logger.Err(" Error saving subscribers: " + err.Error()) + } + } else { // save all + for key, valueMap := range ps.subscribers { + if err := accountingStorage.SetPubSubSubscribers(key, valueMap); err != nil { + Logger.Err(" Error saving subscribers: " + err.Error()) + } + } } } @@ -58,6 +83,7 @@ func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error { expTime = time.Now().Add(si.LifeSpan) } ps.subscribers[si.EventName][utils.InfieldJoin(si.Transport, si.Address)] = expTime + ps.saveSubscribers(si.EventName) *reply = utils.OK return nil } @@ -70,6 +96,7 @@ func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error { return errors.New(*reply) } delete(ps.subscribers[si.EventName], utils.InfieldJoin(si.Transport, si.Address)) + ps.saveSubscribers(si.EventName) *reply = utils.OK return nil } @@ -78,15 +105,17 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { ps.mux.Lock() defer ps.mux.Unlock() subs := ps.subscribers[pi.Event["EventName"]] - for transport_address, expTime := range subs { - split := utils.InfieldSplit(transport_address) + for transportAddress, expTime := range subs { + split := utils.InfieldSplit(transportAddress) if len(split) != 2 { + Logger.Warning(" Wrong transport;address pair: " + transportAddress) continue } transport := split[0] address := split[1] if !expTime.IsZero() && expTime.Before(time.Now()) { - delete(subs, transport_address) + delete(subs, transportAddress) + ps.saveSubscribers(pi.Event["EventName"]) continue // subscription expired, do not send event } switch transport { @@ -96,6 +125,9 @@ func (ps *PubSub) Publish(pi PublishInfo, reply *string) error { for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort if _, err := ps.pubFunc(address, ps.ttlVerify, pi.Event); err == nil { break // Success, no need to reinterate + } else if i == 4 { // Last iteration, syslog the warning + Logger.Warning(fmt.Sprintf(" Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), pi.Event["EventName"])) + break } time.Sleep(delay()) } diff --git a/pubsub/pubsub_test.go b/engine/pubsub_test.go similarity index 92% rename from pubsub/pubsub_test.go rename to engine/pubsub_test.go index 71a94fe64..6571604ee 100644 --- a/pubsub/pubsub_test.go +++ b/engine/pubsub_test.go @@ -1,4 +1,4 @@ -package pubsub +package engine import ( "testing" @@ -8,7 +8,7 @@ import ( ) func TestSubscribe(t *testing.T) { - ps := NewPubSub(false) + ps := NewPubSub(accountingStorage, false) var r string if err := ps.Subscribe(SubscribeInfo{ EventName: "test", @@ -24,7 +24,7 @@ func TestSubscribe(t *testing.T) { } func TestSubscribeNoTransport(t *testing.T) { - ps := NewPubSub(false) + ps := NewPubSub(accountingStorage, false) var r string if err := ps.Subscribe(SubscribeInfo{ EventName: "test", @@ -37,7 +37,7 @@ func TestSubscribeNoTransport(t *testing.T) { } func TestSubscribeNoExpire(t *testing.T) { - ps := NewPubSub(false) + ps := NewPubSub(accountingStorage, false) var r string if err := ps.Subscribe(SubscribeInfo{ EventName: "test", @@ -53,7 +53,7 @@ func TestSubscribeNoExpire(t *testing.T) { } func TestUnsubscribe(t *testing.T) { - ps := NewPubSub(false) + ps := NewPubSub(accountingStorage, false) var r string if err := ps.Subscribe(SubscribeInfo{ EventName: "test", @@ -76,7 +76,7 @@ func TestUnsubscribe(t *testing.T) { } func TestPublish(t *testing.T) { - ps := NewPubSub(true) + ps := NewPubSub(accountingStorage, true) ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { obj.(map[string]string)["called"] = url return nil, nil @@ -110,7 +110,7 @@ func TestPublish(t *testing.T) { } func TestPublishExpired(t *testing.T) { - ps := NewPubSub(true) + ps := NewPubSub(accountingStorage, true) ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { m := obj.(map[string]string) m["called"] = "yes" diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 894905304..631d56f03 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -23,6 +23,7 @@ import ( "encoding/gob" "encoding/json" "reflect" + "time" "github.com/cgrates/cgrates/utils" "github.com/ugorji/go/codec" @@ -79,6 +80,8 @@ type AccountingStorage interface { SetAccount(*Account) error GetCdrStatsQueue(string) (*StatsQueue, error) SetCdrStatsQueue(*StatsQueue) error + GetPubSubSubscribers() (map[string]map[string]time.Time, error) + SetPubSubSubscribers(string, map[string]time.Time) error } type CdrStorage interface { diff --git a/engine/storage_map.go b/engine/storage_map.go index e350c5ff6..01ec265c8 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -561,6 +561,24 @@ func (ms *MapStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { return } +func (ms *MapStorage) GetPubSubSubscribers() (result map[string]map[string]time.Time, err error) { + result = make(map[string]map[string]time.Time) + for key, value := range ms.dict { + if strings.HasPrefix(key, utils.PUBSUB_SUBSCRIBERS_PREFIX) { + subs := make(map[string]time.Time) + if err = ms.ms.Unmarshal(value, subs); err == nil { + result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs + } + } + } + return +} +func (ms *MapStorage) SetPubSubSubscribers(key string, subs map[string]time.Time) (err error) { + result, err := ms.ms.Marshal(subs) + ms.dict[utils.CDR_STATS_QUEUE_PREFIX+key] = result + return +} + func (ms *MapStorage) GetActionPlans(key string) (ats ActionPlans, err error) { if values, ok := ms.dict[utils.ACTION_TIMING_PREFIX+key]; ok { err = ms.ms.Unmarshal(values, &ats) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 41bb26b74..42a7164f0 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -23,6 +23,7 @@ import ( "compress/zlib" "errors" "fmt" + "log" "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/utils" @@ -689,6 +690,32 @@ func (rs *RedisStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { return } +func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]time.Time, err error) { + keys, err := rs.db.Keys(utils.PUBSUB_SUBSCRIBERS_PREFIX + "*") + if err != nil { + return nil, err + } + result = make(map[string]map[string]time.Time) + for _, key := range keys { + log.Print("KEY: ", key) + if values, err := rs.db.Get(key); err == nil { + subs := make(map[string]time.Time) + err = rs.ms.Unmarshal(values, subs) + result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs + } else { + return nil, utils.ErrNotFound + } + } + log.Print("XXX: ", result) + return +} + +func (rs *RedisStorage) SetPubSubSubscribers(key string, subs map[string]time.Time) (err error) { + result, err := rs.ms.Marshal(subs) + rs.db.Set(utils.PUBSUB_SUBSCRIBERS_PREFIX+key, result) + return +} + func (rs *RedisStorage) GetActionPlans(key string) (ats ActionPlans, err error) { var values []byte if values, err = rs.db.Get(utils.ACTION_TIMING_PREFIX + key); err == nil { diff --git a/utils/consts.go b/utils/consts.go index 5a11dc0c0..458b426e1 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -165,6 +165,7 @@ const ( LCR_PREFIX = "lcr_" DERIVEDCHARGERS_PREFIX = "dcs_" CDR_STATS_QUEUE_PREFIX = "csq_" + PUBSUB_SUBSCRIBERS_PREFIX = "pss_" CDR_STATS_PREFIX = "cst_" TEMP_DESTINATION_PREFIX = "tmp_" LOG_CALL_COST_PREFIX = "cco_"