diff --git a/agents/dmtagent.go b/agents/dmtagent.go index 342350643..f111412f8 100644 --- a/agents/dmtagent.go +++ b/agents/dmtagent.go @@ -30,8 +30,8 @@ import ( "github.com/fiorix/go-diameter/diam/sm" ) -func NewDiameterAgent(cgrCfg *config.CGRConfig, smg *rpcclient.RpcClient) (*DiameterAgent, error) { - da := &DiameterAgent{cgrCfg: cgrCfg, smg: smg} +func NewDiameterAgent(cgrCfg *config.CGRConfig, smg *rpcclient.RpcClient, pubsubs *rpcclient.RpcClient) (*DiameterAgent, error) { + da := &DiameterAgent{cgrCfg: cgrCfg, smg: smg, pubsubs: pubsubs} dictsDir := cgrCfg.DiameterAgentCfg().DictionariesDir if len(dictsDir) != 0 { if err := loadDictionaries(dictsDir, "DiameterAgent"); err != nil { @@ -42,8 +42,9 @@ func NewDiameterAgent(cgrCfg *config.CGRConfig, smg *rpcclient.RpcClient) (*Diam } type DiameterAgent struct { - cgrCfg *config.CGRConfig - smg *rpcclient.RpcClient // Connection towards CGR-SMG component + cgrCfg *config.CGRConfig + smg *rpcclient.RpcClient // Connection towards CGR-SMG component + pubsubs *rpcclient.RpcClient // Connection towards CGR-PubSub component } // Creates the message handlers @@ -91,6 +92,18 @@ func (self DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestPro utils.Logger.Err(fmt.Sprintf(" Processing message: %+v AsSMGenericEvent, error: %s", ccr.diamMessage, err)) return cca } + if reqProcessor.PublishEvent && self.pubsubs != nil { + evt, err := smgEv.AsMapStringString() + if err != nil { + utils.Logger.Err(fmt.Sprintf(" Processing message: %+v failed converting SMGEvent to pubsub one, error: %s", ccr.diamMessage, err)) + return nil + } + var reply string + if err := self.pubsubs.Call("PubSubV1.Publish", evt, reply); err != nil { + utils.Logger.Err(fmt.Sprintf(" Processing message: %+v failed publishing event, error: %s", ccr.diamMessage, err)) + return nil + } + } var maxUsage float64 if reqProcessor.DryRun { // DryRun does not send over network utils.Logger.Info(fmt.Sprintf(" SMGenericEvent: %+v", smgEv)) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 352bd9ba0..a4c54bd3b 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -196,9 +196,9 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal server.BijsonRegisterOnDisconnect(smg_econns.OnClientDisconnect) } -func startDiameterAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan chan bool) { +func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { utils.Logger.Info("Starting CGRateS DiameterAgent service.") - var smgConn *rpcclient.RpcClient + var smgConn, pubsubConn *rpcclient.RpcClient var err error if cfg.DiameterAgentCfg().SMGeneric == utils.INTERNAL { smgRpc := <-internalSMGChan @@ -212,7 +212,19 @@ func startDiameterAgent(internalSMGChan chan rpcclient.RpcClientConnection, exit exitChan <- true return } - da, err := agents.NewDiameterAgent(cfg, smgConn) + if cfg.DiameterAgentCfg().PubSubS == utils.INTERNAL { + pubSubRpc := <-internalPubSubSChan + internalPubSubSChan <- pubSubRpc + pubsubConn, err = rpcclient.NewRpcClient("", "", 0, 0, rpcclient.INTERNAL_RPC, pubSubRpc) + } else if len(cfg.DiameterAgentCfg().PubSubS) != 0 { + pubsubConn, err = rpcclient.NewRpcClient("tcp", cfg.DiameterAgentCfg().PubSubS, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) + } + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to PubSubS: %s", err.Error())) + exitChan <- true + return + } + da, err := agents.NewDiameterAgent(cfg, smgConn, pubsubConn) if err != nil { utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) exitChan <- true @@ -372,7 +384,7 @@ func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS } func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, cdrDb engine.CdrStorage, - internalRaterChan chan *engine.Responder, internalPubSubSChan chan engine.PublisherSubscriber, + internalRaterChan chan *engine.Responder, internalPubSubSChan chan rpcclient.RpcClientConnection, internalUserSChan chan engine.UserService, internalAliaseSChan chan engine.AliasService, internalCdrStatSChan chan engine.StatsInterface, server *utils.Server, exitChan chan bool) { utils.Logger.Info("Starting CGRateS CDRS service.") @@ -394,23 +406,19 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, raterConn = &engine.RPCClientConnector{Client: client} } // Pubsub connection init - var pubSubConn engine.PublisherSubscriber + var pubSubConn rpcclient.RpcClientConnection if cfg.CDRSPubSub == utils.INTERNAL { pubSubs := <-internalPubSubSChan pubSubConn = pubSubs internalPubSubSChan <- pubSubs } else if len(cfg.CDRSPubSub) != 0 { - if cfg.CDRSRater == cfg.CDRSPubSub { - pubSubConn = &engine.ProxyPubSub{Client: client} - } else { - client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSPubSub, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to pubsub server: %s", err.Error())) - exitChan <- true - return - } - pubSubConn = &engine.ProxyPubSub{Client: client} + client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSPubSub, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to pubsub server: %s", err.Error())) + exitChan <- true + return } + pubSubConn = client } // Users connection init var usersConn engine.UserService @@ -515,7 +523,7 @@ func startHistoryServer(internalHistorySChan chan history.Scribe, server *utils. internalHistorySChan <- scribeServer } -func startPubSubServer(internalPubSubSChan chan engine.PublisherSubscriber, accountDb engine.AccountingStorage, server *utils.Server) { +func startPubSubServer(internalPubSubSChan chan rpcclient.RpcClientConnection, accountDb engine.AccountingStorage, server *utils.Server) { pubSubServer := engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify) server.RpcRegisterName("PubSubV1", pubSubServer) internalPubSubSChan <- pubSubServer @@ -548,7 +556,7 @@ func startRpc(server *utils.Server, internalRaterChan chan *engine.Responder, internalCdrSChan chan *engine.CdrServer, internalCdrStatSChan chan engine.StatsInterface, internalHistorySChan chan history.Scribe, - internalPubSubSChan chan engine.PublisherSubscriber, + internalPubSubSChan chan rpcclient.RpcClientConnection, internalUserSChan chan engine.UserService, internalAliaseSChan chan engine.AliasService) { select { // Any of the rpc methods will unlock listening to rpc requests @@ -674,7 +682,7 @@ func main() { internalCdrSChan := make(chan *engine.CdrServer, 1) internalCdrStatSChan := make(chan engine.StatsInterface, 1) internalHistorySChan := make(chan history.Scribe, 1) - internalPubSubSChan := make(chan engine.PublisherSubscriber, 1) + internalPubSubSChan := make(chan rpcclient.RpcClientConnection, 1) internalUserSChan := make(chan engine.UserService, 1) internalAliaseSChan := make(chan engine.AliasService, 1) internalSMGChan := make(chan rpcclient.RpcClientConnection, 1) @@ -735,7 +743,7 @@ func main() { } if cfg.DiameterAgentCfg().Enabled { - go startDiameterAgent(internalSMGChan, exitChan) + go startDiameterAgent(internalSMGChan, internalPubSubSChan, exitChan) } // Start HistoryS service diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index 0ee787728..41535636d 100644 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -29,6 +29,7 @@ import ( "github.com/cgrates/cgrates/history" "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled *bool, exitChan chan bool) { @@ -41,7 +42,7 @@ func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled // Starts rater and reports on chan func startRater(internalRaterChan chan *engine.Responder, cacheDoneChan chan struct{}, internalBalancerChan chan *balancer2go.Balancer, internalSchedulerChan chan *scheduler.Scheduler, internalCdrStatSChan chan engine.StatsInterface, internalHistorySChan chan history.Scribe, - internalPubSubSChan chan engine.PublisherSubscriber, internalUserSChan chan engine.UserService, internalAliaseSChan chan engine.AliasService, + internalPubSubSChan chan rpcclient.RpcClientConnection, internalUserSChan chan engine.UserService, internalAliaseSChan chan engine.AliasService, server *utils.Server, ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, logDb engine.LogStorage, stopHandled *bool, exitChan chan bool) { @@ -163,7 +164,7 @@ func startRater(internalRaterChan chan *engine.Responder, cacheDoneChan chan str waitTasks = append(waitTasks, pubsubTaskChan) go func() { defer close(pubsubTaskChan) - var pubSubServer engine.PublisherSubscriber + var pubSubServer rpcclient.RpcClientConnection if cfg.RaterPubSubServer == utils.INTERNAL { select { case pubSubServer = <-internalPubSubSChan: @@ -173,7 +174,7 @@ func startRater(internalRaterChan chan *engine.Responder, cacheDoneChan chan str exitChan <- true return } - } else if pubSubServer, err = engine.NewProxyPubSub(cfg.RaterPubSubServer, cfg.ConnectAttempts, -1); err != nil { + } else if pubSubServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterPubSubServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to pubsubs: %s", err.Error())) exitChan <- true return diff --git a/engine/calldesc.go b/engine/calldesc.go index c38b3be18..2a4ccc69d 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -30,6 +30,7 @@ import ( "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/history" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) const ( @@ -76,7 +77,7 @@ var ( debitPeriod = 10 * time.Second globalRoundingDecimals = 5 historyScribe history.Scribe - pubSubServer PublisherSubscriber + pubSubServer rpcclient.RpcClientConnection userService UserService aliasService AliasService ) @@ -114,7 +115,7 @@ func SetHistoryScribe(scribe history.Scribe) { historyScribe = scribe } -func SetPubSub(ps PublisherSubscriber) { +func SetPubSub(ps rpcclient.RpcClientConnection) { pubSubServer = ps } @@ -129,7 +130,7 @@ func SetAliasService(as AliasService) { func Publish(event CgrEvent) { if pubSubServer != nil { var s string - pubSubServer.Publish(event, &s) + pubSubServer.Call("PubSubV1.Publish", event, &s) } } diff --git a/engine/cdrs.go b/engine/cdrs.go index 0d11122a3..b522b829c 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -27,6 +27,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" "github.com/jinzhu/gorm" mgov2 "gopkg.in/mgo.v2" ) @@ -66,7 +67,7 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) { } } -func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, pubsub PublisherSubscriber, users UserService, aliases AliasService, stats StatsInterface) (*CdrServer, error) { +func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, pubsub rpcclient.RpcClientConnection, users UserService, aliases AliasService, stats StatsInterface) (*CdrServer, error) { return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: &GuardianLock{locksMap: make(map[string]chan bool)}}, nil } @@ -74,7 +75,7 @@ type CdrServer struct { cgrCfg *config.CGRConfig cdrDb CdrStorage rater Connector - pubsub PublisherSubscriber + pubsub rpcclient.RpcClientConnection users UserService aliases AliasService stats StatsInterface diff --git a/engine/pubsub.go b/engine/pubsub.go index 61e609020..2ae544df0 100644 --- a/engine/pubsub.go +++ b/engine/pubsub.go @@ -165,6 +165,53 @@ func (ps *PubSub) ShowSubscribers(in string, out *map[string]*SubscriberData) er return nil } +// rpcclient.RpcClientConnection interface +func (ps *PubSub) Call(serviceMethod string, args interface{}, reply interface{}) error { + switch serviceMethod { + case "PubSubV1.Subscribe": + argsConverted, canConvert := args.(SubscribeInfo) + if !canConvert { + return rpcclient.ErrWrongArgsType + } + replyConverted, canConvert := reply.(*string) + if !canConvert { + return rpcclient.ErrWrongReplyType + } + return ps.Subscribe(argsConverted, replyConverted) + case "PubSubV1.Unsubscribe": + argsConverted, canConvert := args.(SubscribeInfo) + if !canConvert { + return rpcclient.ErrWrongArgsType + } + replyConverted, canConvert := reply.(*string) + if !canConvert { + return rpcclient.ErrWrongReplyType + } + return ps.Unsubscribe(argsConverted, replyConverted) + case "PubSubV1.Publish": + argsConverted, canConvert := args.(CgrEvent) + if !canConvert { + return rpcclient.ErrWrongArgsType + } + replyConverted, canConvert := reply.(*string) + if !canConvert { + return rpcclient.ErrWrongReplyType + } + return ps.Publish(argsConverted, replyConverted) + case "PubSubV1.ShowSubscribers": + argsConverted, canConvert := args.(string) + if !canConvert { + return rpcclient.ErrWrongArgsType + } + replyConverted, canConvert := reply.(*map[string]*SubscriberData) + if !canConvert { + return rpcclient.ErrWrongReplyType + } + return ps.ShowSubscribers(argsConverted, replyConverted) + } + return rpcclient.ErrUnsupporteServiceMethod +} + type ProxyPubSub struct { Client *rpcclient.RpcClient } diff --git a/sessionmanager/smg_event.go b/sessionmanager/smg_event.go index c775c45a1..5245e9906 100644 --- a/sessionmanager/smg_event.go +++ b/sessionmanager/smg_event.go @@ -20,6 +20,7 @@ package sessionmanager import ( "encoding/json" + "fmt" "strconv" "time" @@ -348,3 +349,16 @@ func (self SMGenericEvent) AsLcrRequest() *engine.LcrRequest { Duration: usageStr, } } + +// AsMapStringString Converts into map[string]string, used for example as pubsub event +func (self SMGenericEvent) AsMapStringString() (map[string]string, error) { + mp := make(map[string]string) + for k, v := range self { + if strV, casts := utils.CastIfToString(v); !casts { + return nil, fmt.Errorf("Value %+v does not cast to string", v) + } else { + mp[k] = strV + } + } + return mp, nil +} diff --git a/utils/coreutils.go b/utils/coreutils.go index 9646d6e1e..549fa97ed 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -474,3 +474,32 @@ func FmtFieldWidth(source string, width int, strip, padding string, mandatory bo } return source, nil } + +// Returns the string representation of iface or error if not convertible +func CastIfToString(iface interface{}) (strVal string, casts bool) { + switch iface.(type) { + case string: + strVal = iface.(string) + casts = true + case int: + strVal = strconv.Itoa(iface.(int)) + casts = true + case int64: + strVal = strconv.FormatInt(iface.(int64), 10) + casts = true + case float64: + strVal = strconv.FormatFloat(iface.(float64), 'f', -1, 64) + casts = true + case bool: + strVal = strconv.FormatBool(iface.(bool)) + casts = true + case []uint8: + var byteVal []byte + if byteVal, casts = iface.([]byte); casts { + strVal = string(byteVal) + } + default: // Maybe we are lucky and the value converts to string + strVal, casts = iface.(string) + } + return strVal, casts +} diff --git a/utils/utils_test.go b/utils/utils_test.go index 1f5951c07..6c66c8a0d 100644 --- a/utils/utils_test.go +++ b/utils/utils_test.go @@ -579,3 +579,24 @@ func TestPaddingNotAllowed(t *testing.T) { t.Error("Expected error") } } + +func TestCastIfToString(t *testing.T) { + v := interface{}("somestr") + if sOut, casts := CastIfToString(v); !casts { + t.Error("Does not cast") + } else if sOut != "somestr" { + t.Errorf("Received: %+v", sOut) + } + v = interface{}(1) + if sOut, casts := CastIfToString(v); !casts { + t.Error("Does not cast") + } else if sOut != "1" { + t.Errorf("Received: %+v", sOut) + } + v = interface{}(1.2) + if sOut, casts := CastIfToString(v); !casts { + t.Error("Does not cast") + } else if sOut != "1.2" { + t.Errorf("Received: %+v", sOut) + } +}