diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 6873c60bc..3db6fe82d 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -110,7 +110,7 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one break } - cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cdrcCfg.CdrsConns, internalCdrSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %s", err.Error())) @@ -133,7 +133,7 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal utils.Logger.Info("Starting CGRateS SMGeneric service.") var ralsConns, cdrsConn *rpcclient.RpcClientPool if len(cfg.SmGenericConfig.RALsConns) != 0 { - ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.SmGenericConfig.RALsConns, internalRaterChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RALs: %s", err.Error())) @@ -142,7 +142,7 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal } } if len(cfg.SmGenericConfig.CDRsConns) != 0 { - cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.SmGenericConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RALs: %s", err.Error())) @@ -173,7 +173,7 @@ func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcC utils.Logger.Info("Starting CGRateS DiameterAgent service.") var smgConn, pubsubConn *rpcclient.RpcClientPool if len(cfg.DiameterAgentCfg().SMGenericConns) != 0 { - smgConn, err = engine.NewRPCPool(rpcclient.POOL_BROADCAST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + smgConn, err = engine.NewRPCPool(rpcclient.POOL_BROADCAST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.DiameterAgentCfg().SMGenericConns, internalSMGChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to SMG: %s", err.Error())) @@ -182,7 +182,7 @@ func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcC } } if len(cfg.DiameterAgentCfg().PubSubConns) != 0 { - pubsubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + pubsubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.DiameterAgentCfg().PubSubConns, internalPubSubSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to PubSubS: %s", err.Error())) @@ -206,7 +206,7 @@ func startSmFreeSWITCH(internalRaterChan, internalCDRSChan chan rpcclient.RpcCli utils.Logger.Info("Starting CGRateS SMFreeSWITCH service.") var ralsConn, cdrsConn *rpcclient.RpcClientPool if len(cfg.SmFsConfig.RALsConns) != 0 { - ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.SmFsConfig.RALsConns, internalRaterChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) @@ -215,7 +215,7 @@ func startSmFreeSWITCH(internalRaterChan, internalCDRSChan chan rpcclient.RpcCli } } if len(cfg.SmFsConfig.CDRsConns) != 0 { - cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.SmFsConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) @@ -235,7 +235,7 @@ func startSmKamailio(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien utils.Logger.Info("Starting CGRateS SMKamailio service.") var ralsConn, cdrsConn *rpcclient.RpcClientPool if len(cfg.SmKamConfig.RALsConns) != 0 { - ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.SmKamConfig.RALsConns, internalRaterChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) @@ -244,7 +244,7 @@ func startSmKamailio(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien } } if len(cfg.SmKamConfig.CDRsConns) != 0 { - cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.SmKamConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) @@ -264,7 +264,7 @@ func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien utils.Logger.Info("Starting CGRateS SMOpenSIPS service.") var ralsConn, cdrsConn *rpcclient.RpcClientPool if len(cfg.SmOsipsConfig.RALsConns) != 0 { - ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.SmOsipsConfig.RALsConns, internalRaterChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RALs: %s", err.Error())) @@ -273,7 +273,7 @@ func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien } } if len(cfg.SmOsipsConfig.CDRsConns) != 0 { - cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.SmOsipsConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRs: %s", err.Error())) @@ -296,7 +296,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine utils.Logger.Info("Starting CGRateS CDRS service.") var ralConn, pubSubConn, usersConn, aliasesConn, statsConn *rpcclient.RpcClientPool if len(cfg.CDRSRaterConns) != 0 { // Conn pool towards RAL - ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.CDRSRaterConns, internalRaterChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) @@ -305,7 +305,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine } } if len(cfg.CDRSPubSubSConns) != 0 { // Pubsub connection init - pubSubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + pubSubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.CDRSPubSubSConns, internalPubSubSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to PubSubSystem: %s", err.Error())) @@ -314,7 +314,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine } } if len(cfg.CDRSUserSConns) != 0 { // Users connection init - usersConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + usersConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.CDRSUserSConns, internalUserSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to UserS: %s", err.Error())) @@ -323,7 +323,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine } } if len(cfg.CDRSAliaseSConns) != 0 { // Aliases connection init - aliasesConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + aliasesConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.CDRSAliaseSConns, internalAliaseSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to AliaseS: %s", err.Error())) @@ -333,7 +333,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine } if len(cfg.CDRSStatSConns) != 0 { // Stats connection init - statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.CDRSStatSConns, internalCdrStatSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to StatS: %s", err.Error())) diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index e15a413a0..e866bc454 100644 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -113,7 +113,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, cdrstatTaskChan) go func() { defer close(cdrstatTaskChan) - cdrStats, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + cdrStats, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.RALsCDRStatSConns, internalCdrStatSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRStatS, error: %s", err.Error())) @@ -127,7 +127,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, histTaskChan) go func() { defer close(histTaskChan) - if historySConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + if historySConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.RALsHistorySConns, internalHistorySChan, cfg.InternalTtl); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect HistoryS, error: %s", err.Error())) exitChan <- true @@ -142,7 +142,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, pubsubTaskChan) go func() { defer close(pubsubTaskChan) - if pubSubSConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + if pubSubSConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.RALsPubSubSConns, internalPubSubSChan, cfg.InternalTtl); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to PubSubS: %s", err.Error())) exitChan <- true @@ -157,7 +157,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, aliasesTaskChan) go func() { defer close(aliasesTaskChan) - if aliaseSCons, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + if aliaseSCons, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.RALsAliasSConns, internalAliaseSChan, cfg.InternalTtl); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to AliaseS, error: %s", err.Error())) exitChan <- true @@ -173,7 +173,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, usersTaskChan) go func() { defer close(usersTaskChan) - if usersConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, + if usersConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.RALsUserSConns, internalUserSChan, cfg.InternalTtl); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect UserS, error: %s", err.Error())) exitChan <- true diff --git a/config/libconfig_json.go b/config/libconfig_json.go index e80e50cee..872c42b8f 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -206,7 +206,8 @@ type SmFsJsonCfg struct { // Represents one connection instance towards a rater/cdrs server type HaPoolJsonCfg struct { - Address *string + Address *string + Transport *string } // Represents one connection instance towards FreeSWITCH diff --git a/config/smconfig.go b/config/smconfig.go index a1113d7a2..9205baba4 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -35,7 +35,8 @@ func NewDfltHaPoolConfig() *HaPoolConfig { // One connection to Rater type HaPoolConfig struct { - Address string + Address string + Transport string } func (self *HaPoolConfig) loadFromJsonCfg(jsnCfg *HaPoolJsonCfg) error { @@ -45,6 +46,9 @@ func (self *HaPoolConfig) loadFromJsonCfg(jsnCfg *HaPoolJsonCfg) error { if jsnCfg.Address != nil { self.Address = *jsnCfg.Address } + if jsnCfg.Transport != nil { + self.Transport = *jsnCfg.Transport + } return nil } diff --git a/engine/libengine.go b/engine/libengine.go index 65d1371ec..c322337d7 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -20,6 +20,7 @@ package engine import ( "errors" + "fmt" "time" "github.com/cgrates/cgrates/config" @@ -27,7 +28,7 @@ import ( "github.com/cgrates/rpcclient" ) -func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, connectTimeout, replyTimeout time.Duration, codec string, +func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, connectTimeout, replyTimeout time.Duration, rpcConnCfgs []*config.HaPoolConfig, internalConnChan chan rpcclient.RpcClientConnection, ttl time.Duration) (*rpcclient.RpcClientPool, error) { var rpcClient *rpcclient.RpcClient var err error @@ -43,8 +44,14 @@ func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, connectTi return nil, errors.New("TTL triggered") } rpcClient, err = rpcclient.NewRpcClient("", "", connAttempts, reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn) - } else { + } else if utils.IsSliceMember([]string{utils.MetaJSONrpc, utils.MetaGOBrpc, ""}, rpcConnCfg.Transport) { + codec := utils.GOB + if rpcConnCfg.Transport != "" { + codec = rpcConnCfg.Transport[1:] // Transport contains always * before codec understood by rpcclient + } rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, connAttempts, reconnects, connectTimeout, replyTimeout, codec, nil) + } else { + return nil, fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport) } if err == nil { atLestOneConnected = true diff --git a/utils/consts.go b/utils/consts.go index 3b2edf4f7..27c6679c8 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -30,7 +30,7 @@ var ( ErrAccountNotFound = errors.New("ACCOUNT_NOT_FOUND") ErrAccountDisabled = errors.New("ACCOUNT_DISABLED") ErrUserNotFound = errors.New("USER_NOT_FOUND") - ErrInsufficientCredit = errors.New("INSUFFICENT_CREDIT") + ErrInsufficientCredit = errors.New("INSUFFICIENT_CREDIT") ErrNotConvertible = errors.New("NOT_CONVERTIBLE") CdreCdrFormats = []string{CSV, DRYRUN, CDRE_FIXED_WIDTH} @@ -291,4 +291,6 @@ const ( SessionTTLUsage = "SessionTTLUsage" HandlerSubstractUsage = "*substract_usage" XML = "xml" + MetaGOBrpc = "*gob" + MetaJSONrpc = "*json" )