mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-23 16:18:44 +05:00
Merge branch 'master' of https://github.com/cgrates/cgrates
This commit is contained in:
@@ -67,6 +67,7 @@ var (
|
||||
exitChan = make(chan bool)
|
||||
server = &engine.Server{}
|
||||
scribeServer history.Scribe
|
||||
pubSubServer engine.PublisherSubscriber
|
||||
cdrServer *engine.CdrServer
|
||||
cdrStats *engine.Stats
|
||||
cfg *config.CGRConfig
|
||||
@@ -336,7 +337,7 @@ func startHistoryServer(chanDone chan struct{}) {
|
||||
// chanStartServer will report when server is up, useful for internal requests
|
||||
func startHistoryAgent(chanServerStarted chan struct{}) {
|
||||
if cfg.HistoryServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting
|
||||
engine.Logger.Crit(fmt.Sprintf("<HistoryAgent> Connecting internally to HistoryServer"))
|
||||
//engine.Logger.Crit(fmt.Sprintf("<HistoryAgent> Connecting internally to HistoryServer"))
|
||||
select {
|
||||
case <-time.After(1 * time.Minute):
|
||||
engine.Logger.Crit(fmt.Sprintf("<HistoryAgent> Timeout waiting for server to start."))
|
||||
@@ -363,6 +364,45 @@ func startHistoryAgent(chanServerStarted chan struct{}) {
|
||||
return
|
||||
}
|
||||
|
||||
func startPubSubServer(chanDone chan struct{}, accountDb engine.AccountingStorage) {
|
||||
if pubSubServer = engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify); err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<PubSubServer> Could not start, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
server.RpcRegisterName("PubSubV1", pubSubServer)
|
||||
close(chanDone)
|
||||
}
|
||||
|
||||
// chanStartServer will report when server is up, useful for internal requests
|
||||
func startPubSubAgent(chanServerStarted chan struct{}, accountDb engine.AccountingStorage) {
|
||||
if cfg.PubSubServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting
|
||||
select {
|
||||
case <-time.After(1 * time.Minute):
|
||||
engine.Logger.Crit(fmt.Sprintf("<PubSubAgent> Timeout waiting for server to start."))
|
||||
exitChan <- true
|
||||
return
|
||||
case <-chanServerStarted:
|
||||
}
|
||||
//<-chanServerStarted // If server is not enabled, will have deadlock here
|
||||
} else { // Connect in iteration since there are chances of concurrency here
|
||||
delay := utils.Fib()
|
||||
for i := 0; i < 3; i++ { //ToDo: Make it globally configurable
|
||||
//engine.Logger.Crit(fmt.Sprintf("<PubSubAgent> Trying to connect, iteration: %d, time %s", i, time.Now()))
|
||||
if pubSubServer = engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify); err == nil {
|
||||
break //Connected so no need to reiterate
|
||||
} else if i == 2 && err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<PubSubAgent> Could not connect to the server, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
time.Sleep(delay())
|
||||
}
|
||||
}
|
||||
engine.SetPubSub(pubSubServer) // scribeServer comes from global variable
|
||||
return
|
||||
}
|
||||
|
||||
// Starts the rpc server, waiting for the necessary components to finish their tasks
|
||||
func serveRpc(rpcWaitChans []chan struct{}) {
|
||||
for _, chn := range rpcWaitChans {
|
||||
@@ -579,6 +619,18 @@ func main() {
|
||||
go startHistoryAgent(histServChan)
|
||||
}
|
||||
|
||||
var pubsubServChan chan struct{} // Will be initialized only if the server starts
|
||||
if cfg.PubSubServerEnabled {
|
||||
pubsubServChan = make(chan struct{})
|
||||
rpcWait = append(rpcWait, pubsubServChan)
|
||||
go startPubSubServer(pubsubServChan, accountDb)
|
||||
}
|
||||
|
||||
if cfg.PubSubAgentEnabled {
|
||||
engine.Logger.Info("Starting CGRateS PubSub Agent.")
|
||||
go startPubSubAgent(pubsubServChan, accountDb)
|
||||
}
|
||||
|
||||
var cdrsChan chan struct{}
|
||||
if cfg.CDRSEnabled {
|
||||
engine.Logger.Info("Starting CGRateS CDRS service.")
|
||||
|
||||
@@ -49,7 +49,9 @@ func generalSignalHandler() {
|
||||
sig := <-c
|
||||
engine.Logger.Info(fmt.Sprintf("Caught signal %v, shuting down cgr-engine\n", sig))
|
||||
var dummyInt int
|
||||
cdrStats.Stop(dummyInt, &dummyInt)
|
||||
if cdrStats != nil {
|
||||
cdrStats.Stop(dummyInt, &dummyInt)
|
||||
}
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/rpc"
|
||||
"net/rpc/jsonrpc"
|
||||
"os"
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
@@ -58,6 +59,7 @@ var (
|
||||
tenant = flag.String("tenant", "cgrates.org", "The type of record to use in queries.")
|
||||
subject = flag.String("subject", "1001", "The rating subject to use in queries.")
|
||||
destination = flag.String("destination", "1002", "The destination to use in queries.")
|
||||
json = flag.Bool("json", false, "Use JSON RPC")
|
||||
|
||||
nilDuration = time.Duration(0)
|
||||
)
|
||||
@@ -107,7 +109,14 @@ func durInternalRater(cd *engine.CallDescriptor) (time.Duration, error) {
|
||||
|
||||
func durRemoteRater(cd *engine.CallDescriptor) (time.Duration, error) {
|
||||
result := engine.CallCost{}
|
||||
client, err := rpc.Dial("tcp", *raterAddress)
|
||||
var client *rpc.Client
|
||||
var err error
|
||||
if *json {
|
||||
client, err = jsonrpc.Dial("tcp", *raterAddress)
|
||||
} else {
|
||||
client, err = rpc.Dial("tcp", *raterAddress)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nilDuration, fmt.Errorf("Could not connect to engine: %s", err.Error())
|
||||
}
|
||||
|
||||
@@ -213,7 +213,10 @@ type CGRConfig struct {
|
||||
HistoryServer string // Address where to reach the master history server: <internal|x.y.z.y:1234>
|
||||
HistoryServerEnabled bool // Starts History as server: <true|false>.
|
||||
HistoryDir string // Location on disk where to store history files.
|
||||
HistorySaveInterval time.Duration // The timout duration between history writes
|
||||
HistorySaveInterval time.Duration // The timout duration between pubsub writes
|
||||
PubSubAgentEnabled bool // Starts PubSub as an agent: <true|false>.
|
||||
PubSubServer string // Address where to reach the master pubsub server: <internal|x.y.z.y:1234>
|
||||
PubSubServerEnabled bool // Starts PubSub as server: <true|false>.
|
||||
MailerServer string // The server to use when sending emails out
|
||||
MailerAuthUser string // Authenticate to email server using this user
|
||||
MailerAuthPass string // Authenticate to email server with this password
|
||||
@@ -319,6 +322,10 @@ func (self *CGRConfig) checkConfigSanity() error {
|
||||
if self.HistoryAgentEnabled && !self.HistoryServerEnabled {
|
||||
return errors.New("HistoryServer not enabled but referenced by HistoryAgent component")
|
||||
}
|
||||
// PubSubAgent
|
||||
if self.PubSubAgentEnabled && !self.PubSubServerEnabled {
|
||||
return errors.New("PubSubServer not enabled but referenced by PubSubAgent component")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -411,6 +418,16 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
|
||||
return err
|
||||
}
|
||||
|
||||
jsnPubSubServCfg, err := jsnCfg.PubSubServJsonCfg()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
jsnPubSubAgentCfg, err := jsnCfg.PubSubAgentJsonCfg()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
jsnMailerCfg, err := jsnCfg.MailerJsonCfg()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -678,6 +695,21 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
|
||||
}
|
||||
}
|
||||
|
||||
if jsnPubSubAgentCfg != nil {
|
||||
if jsnPubSubAgentCfg.Enabled != nil {
|
||||
self.PubSubAgentEnabled = *jsnPubSubAgentCfg.Enabled
|
||||
}
|
||||
if jsnPubSubAgentCfg.Server != nil {
|
||||
self.PubSubServer = *jsnPubSubAgentCfg.Server
|
||||
}
|
||||
}
|
||||
|
||||
if jsnPubSubServCfg != nil {
|
||||
if jsnPubSubServCfg.Enabled != nil {
|
||||
self.PubSubServerEnabled = *jsnPubSubServCfg.Enabled
|
||||
}
|
||||
}
|
||||
|
||||
if jsnMailerCfg != nil {
|
||||
if jsnMailerCfg.Server != nil {
|
||||
self.MailerServer = *jsnMailerCfg.Server
|
||||
|
||||
@@ -243,6 +243,17 @@ const CGRATES_CFG_JSON = `
|
||||
"server": "internal", // address where to reach the master history server: <internal|x.y.z.y:1234>
|
||||
},
|
||||
|
||||
"pubsub_server": {
|
||||
"enabled": false, // starts History service: <true|false>.
|
||||
"save_interval": "1s", // interval to save changed cache into .git archive
|
||||
},
|
||||
|
||||
|
||||
"pubsub_agent": {
|
||||
"enabled": false, // starts PubSub as a client: <true|false>.
|
||||
"server": "internal", // address where to reach the master pubsub server: <internal|x.y.z.y:1234>
|
||||
},
|
||||
|
||||
|
||||
"mailer": {
|
||||
"server": "localhost", // the server to use when sending emails out
|
||||
|
||||
@@ -20,9 +20,10 @@ package config
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/DisposaBoy/JsonConfigReader"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/DisposaBoy/JsonConfigReader"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -48,6 +49,8 @@ const (
|
||||
OSIPS_JSN = "opensips"
|
||||
HISTSERV_JSN = "history_server"
|
||||
HISTAGENT_JSN = "history_agent"
|
||||
PUBSUBSERV_JSN = "pubsub_server"
|
||||
PUBSUBAGENT_JSN = "pubsub_agent"
|
||||
MAILER_JSN = "mailer"
|
||||
)
|
||||
|
||||
@@ -254,6 +257,30 @@ func (self CgrJsonCfg) HistAgentJsonCfg() (*HistAgentJsonCfg, error) {
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (self CgrJsonCfg) PubSubServJsonCfg() (*PubSubServJsonCfg, error) {
|
||||
rawCfg, hasKey := self[PUBSUBSERV_JSN]
|
||||
if !hasKey {
|
||||
return nil, nil
|
||||
}
|
||||
cfg := new(PubSubServJsonCfg)
|
||||
if err := json.Unmarshal(*rawCfg, cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (self CgrJsonCfg) PubSubAgentJsonCfg() (*PubSubAgentJsonCfg, error) {
|
||||
rawCfg, hasKey := self[PUBSUBAGENT_JSN]
|
||||
if !hasKey {
|
||||
return nil, nil
|
||||
}
|
||||
cfg := new(PubSubAgentJsonCfg)
|
||||
if err := json.Unmarshal(*rawCfg, cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (self CgrJsonCfg) MailerJsonCfg() (*MailerJsonCfg, error) {
|
||||
rawCfg, hasKey := self[MAILER_JSN]
|
||||
if !hasKey {
|
||||
|
||||
@@ -406,6 +406,29 @@ func TestDfHistAgentJsonCfg(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDfPubSubServJsonCfg(t *testing.T) {
|
||||
eCfg := &PubSubServJsonCfg{
|
||||
Enabled: utils.BoolPointer(false),
|
||||
}
|
||||
if cfg, err := dfCgrJsonCfg.PubSubServJsonCfg(); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eCfg, cfg) {
|
||||
t.Error("Received: ", cfg)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDfPubSubAgentJsonCfg(t *testing.T) {
|
||||
eCfg := &PubSubAgentJsonCfg{
|
||||
Enabled: utils.BoolPointer(false),
|
||||
Server: utils.StringPointer("internal"),
|
||||
}
|
||||
if cfg, err := dfCgrJsonCfg.PubSubAgentJsonCfg(); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eCfg, cfg) {
|
||||
t.Error("Received: ", cfg)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDfMailerJsonCfg(t *testing.T) {
|
||||
eCfg := &MailerJsonCfg{
|
||||
Server: utils.StringPointer("localhost"),
|
||||
@@ -463,6 +486,11 @@ func TestNewCgrJsonCfgFromFile(t *testing.T) {
|
||||
} else if cfg != nil {
|
||||
t.Error("Received: ", cfg)
|
||||
}
|
||||
if cfg, err := cgrJsonCfg.PubSubAgentJsonCfg(); err != nil {
|
||||
t.Error(err)
|
||||
} else if cfg != nil {
|
||||
t.Error("Received: ", cfg)
|
||||
}
|
||||
eCfgSmFs := &SmFsJsonCfg{
|
||||
Enabled: utils.BoolPointer(true),
|
||||
Connections: &[]*FsConnJsonCfg{
|
||||
|
||||
@@ -217,6 +217,17 @@ type HistAgentJsonCfg struct {
|
||||
Server *string
|
||||
}
|
||||
|
||||
// PubSub server config section
|
||||
type PubSubServJsonCfg struct {
|
||||
Enabled *bool
|
||||
}
|
||||
|
||||
// PubSub agent config section
|
||||
type PubSubAgentJsonCfg struct {
|
||||
Enabled *bool
|
||||
Server *string
|
||||
}
|
||||
|
||||
// Mailer config section
|
||||
type MailerJsonCfg struct {
|
||||
Server *string
|
||||
|
||||
64
console/publish.go
Normal file
64
console/publish.go
Normal file
@@ -0,0 +1,64 @@
|
||||
/*
|
||||
Rating system designed to be used in VoIP Carriers World
|
||||
Copyright (C) 2012-2015 ITsysCOM
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package console
|
||||
|
||||
import "github.com/cgrates/cgrates/engine"
|
||||
|
||||
func init() {
|
||||
c := &CmdPublish{
|
||||
name: "publish",
|
||||
rpcMethod: "PubSubV1.Publish",
|
||||
}
|
||||
commands[c.Name()] = c
|
||||
c.CommandExecuter = &CommandExecuter{c}
|
||||
}
|
||||
|
||||
type CmdPublish struct {
|
||||
name string
|
||||
rpcMethod string
|
||||
rpcParams *engine.PublishInfo
|
||||
*CommandExecuter
|
||||
}
|
||||
|
||||
func (self *CmdPublish) Name() string {
|
||||
return self.name
|
||||
}
|
||||
|
||||
func (self *CmdPublish) RpcMethod() string {
|
||||
return self.rpcMethod
|
||||
}
|
||||
|
||||
func (self *CmdPublish) RpcParams(ptr bool) interface{} {
|
||||
if self.rpcParams == nil {
|
||||
self.rpcParams = &engine.PublishInfo{}
|
||||
}
|
||||
if ptr {
|
||||
return self.rpcParams
|
||||
}
|
||||
return *self.rpcParams
|
||||
}
|
||||
|
||||
func (self *CmdPublish) PostprocessRpcParams() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *CmdPublish) RpcResult() interface{} {
|
||||
var s string
|
||||
return &s
|
||||
}
|
||||
64
console/show_subscribers.go
Normal file
64
console/show_subscribers.go
Normal file
@@ -0,0 +1,64 @@
|
||||
/*
|
||||
Rating system designed to be used in VoIP Carriers World
|
||||
Copyright (C) 2012-2015 ITsysCOM
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package console
|
||||
|
||||
import "github.com/cgrates/cgrates/engine"
|
||||
|
||||
func init() {
|
||||
c := &CmdShowSubscribers{
|
||||
name: "show_subscribers",
|
||||
rpcMethod: "PubSubV1.ShowSubscribers",
|
||||
}
|
||||
commands[c.Name()] = c
|
||||
c.CommandExecuter = &CommandExecuter{c}
|
||||
}
|
||||
|
||||
type CmdShowSubscribers struct {
|
||||
name string
|
||||
rpcMethod string
|
||||
rpcParams *StringWrapper
|
||||
*CommandExecuter
|
||||
}
|
||||
|
||||
func (self *CmdShowSubscribers) Name() string {
|
||||
return self.name
|
||||
}
|
||||
|
||||
func (self *CmdShowSubscribers) RpcMethod() string {
|
||||
return self.rpcMethod
|
||||
}
|
||||
|
||||
func (self *CmdShowSubscribers) RpcParams(ptr bool) interface{} {
|
||||
if self.rpcParams == nil {
|
||||
self.rpcParams = &StringWrapper{}
|
||||
}
|
||||
if ptr {
|
||||
return self.rpcParams
|
||||
}
|
||||
return *self.rpcParams
|
||||
}
|
||||
|
||||
func (self *CmdShowSubscribers) PostprocessRpcParams() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *CmdShowSubscribers) RpcResult() interface{} {
|
||||
var s map[string]map[string]*engine.SubscriberData
|
||||
return &s
|
||||
}
|
||||
64
console/subscribe.go
Normal file
64
console/subscribe.go
Normal file
@@ -0,0 +1,64 @@
|
||||
/*
|
||||
Rating system designed to be used in VoIP Carriers World
|
||||
Copyright (C) 2012-2015 ITsysCOM
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package console
|
||||
|
||||
import "github.com/cgrates/cgrates/engine"
|
||||
|
||||
func init() {
|
||||
c := &CmdSubscribe{
|
||||
name: "subscribe",
|
||||
rpcMethod: "PubSubV1.Subscribe",
|
||||
}
|
||||
commands[c.Name()] = c
|
||||
c.CommandExecuter = &CommandExecuter{c}
|
||||
}
|
||||
|
||||
type CmdSubscribe struct {
|
||||
name string
|
||||
rpcMethod string
|
||||
rpcParams *engine.SubscribeInfo
|
||||
*CommandExecuter
|
||||
}
|
||||
|
||||
func (self *CmdSubscribe) Name() string {
|
||||
return self.name
|
||||
}
|
||||
|
||||
func (self *CmdSubscribe) RpcMethod() string {
|
||||
return self.rpcMethod
|
||||
}
|
||||
|
||||
func (self *CmdSubscribe) RpcParams(ptr bool) interface{} {
|
||||
if self.rpcParams == nil {
|
||||
self.rpcParams = &engine.SubscribeInfo{}
|
||||
}
|
||||
if ptr {
|
||||
return self.rpcParams
|
||||
}
|
||||
return *self.rpcParams
|
||||
}
|
||||
|
||||
func (self *CmdSubscribe) PostprocessRpcParams() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *CmdSubscribe) RpcResult() interface{} {
|
||||
var s string
|
||||
return &s
|
||||
}
|
||||
64
console/unsubscribe.go
Normal file
64
console/unsubscribe.go
Normal file
@@ -0,0 +1,64 @@
|
||||
/*
|
||||
Rating system designed to be used in VoIP Carriers World
|
||||
Copyright (C) 2012-2015 ITsysCOM
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package console
|
||||
|
||||
import "github.com/cgrates/cgrates/engine"
|
||||
|
||||
func init() {
|
||||
c := &CmdUnsubscribe{
|
||||
name: "unsubscribe",
|
||||
rpcMethod: "PubSubV1.Unsubscribe",
|
||||
}
|
||||
commands[c.Name()] = c
|
||||
c.CommandExecuter = &CommandExecuter{c}
|
||||
}
|
||||
|
||||
type CmdUnsubscribe struct {
|
||||
name string
|
||||
rpcMethod string
|
||||
rpcParams *engine.SubscribeInfo
|
||||
*CommandExecuter
|
||||
}
|
||||
|
||||
func (self *CmdUnsubscribe) Name() string {
|
||||
return self.name
|
||||
}
|
||||
|
||||
func (self *CmdUnsubscribe) RpcMethod() string {
|
||||
return self.rpcMethod
|
||||
}
|
||||
|
||||
func (self *CmdUnsubscribe) RpcParams(ptr bool) interface{} {
|
||||
if self.rpcParams == nil {
|
||||
self.rpcParams = &engine.SubscribeInfo{}
|
||||
}
|
||||
if ptr {
|
||||
return self.rpcParams
|
||||
}
|
||||
return *self.rpcParams
|
||||
}
|
||||
|
||||
func (self *CmdUnsubscribe) PostprocessRpcParams() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *CmdUnsubscribe) RpcResult() interface{} {
|
||||
var s string
|
||||
return &s
|
||||
}
|
||||
@@ -222,6 +222,15 @@
|
||||
// "server": "internal", // address where to reach the master history server: <internal|x.y.z.y:1234>
|
||||
//},
|
||||
|
||||
//"pubsub_server": {
|
||||
// "enabled": false, // starts pubsub service: <true|false>.
|
||||
//},
|
||||
|
||||
|
||||
//"pubsub_agent": {
|
||||
// "enabled": false, // starts pubsub as a client: <true|false>.
|
||||
// "server": "internal", // address where to reach the master pubsub server: <internal|x.y.z.y:1234>
|
||||
//},
|
||||
|
||||
//"mailer": {
|
||||
// "server": "localhost", // the server to use when sending emails out
|
||||
|
||||
@@ -17,4 +17,14 @@
|
||||
"scheduler": {
|
||||
"enabled": true, // start Scheduler service: <true|false>
|
||||
},
|
||||
|
||||
"pubsub_server": {
|
||||
"enabled": true, // starts pubsub service: <true|false>.
|
||||
},
|
||||
|
||||
|
||||
"pubsub_agent": {
|
||||
"enabled": true, // starts pubsub as a client: <true|false>.
|
||||
"server": "internal", // address where to reach the master pubsub server: <internal|x.y.z.y:1234>
|
||||
},
|
||||
}
|
||||
|
||||
@@ -21,13 +21,12 @@ package engine
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
//"log"
|
||||
|
||||
"log/syslog"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
//"encoding/json"
|
||||
|
||||
"github.com/cgrates/cgrates/cache2go"
|
||||
"github.com/cgrates/cgrates/history"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -68,6 +67,7 @@ var (
|
||||
debitPeriod = 10 * time.Second
|
||||
globalRoundingDecimals = 10
|
||||
historyScribe history.Scribe
|
||||
pubSubServer PublisherSubscriber
|
||||
//historyScribe, _ = history.NewMockScribe()
|
||||
)
|
||||
|
||||
@@ -104,6 +104,10 @@ func SetHistoryScribe(scribe history.Scribe) {
|
||||
historyScribe = scribe
|
||||
}
|
||||
|
||||
func SetPubSub(ps PublisherSubscriber) {
|
||||
pubSubServer = ps
|
||||
}
|
||||
|
||||
/*
|
||||
The input stucture that contains call information.
|
||||
*/
|
||||
|
||||
184
engine/pubsub.go
Normal file
184
engine/pubsub.go
Normal file
@@ -0,0 +1,184 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
type SubscribeInfo struct {
|
||||
EventName string
|
||||
EventFilter string
|
||||
Transport string
|
||||
Address string
|
||||
LifeSpan time.Duration
|
||||
}
|
||||
|
||||
type PublishInfo struct {
|
||||
Event map[string]string
|
||||
}
|
||||
|
||||
type PublisherSubscriber interface {
|
||||
Subscribe(SubscribeInfo, *string) error
|
||||
Unsubscribe(SubscribeInfo, *string) error
|
||||
Publish(PublishInfo, *string) error
|
||||
ShowSubscribers(string, *map[string]map[string]*SubscriberData) error
|
||||
}
|
||||
|
||||
type SubscriberData struct {
|
||||
ExpTime time.Time
|
||||
Filters utils.RSRFields
|
||||
}
|
||||
|
||||
type PubSub struct {
|
||||
subscribers map[string]map[string]*SubscriberData
|
||||
ttlVerify bool
|
||||
pubFunc func(string, bool, interface{}) ([]byte, error)
|
||||
mux *sync.Mutex
|
||||
accountDb AccountingStorage
|
||||
}
|
||||
|
||||
func NewPubSub(accountDb AccountingStorage, ttlVerify bool) *PubSub {
|
||||
ps := &PubSub{
|
||||
ttlVerify: ttlVerify,
|
||||
subscribers: make(map[string]map[string]*SubscriberData),
|
||||
pubFunc: utils.HttpJsonPost,
|
||||
mux: &sync.Mutex{},
|
||||
accountDb: accountDb,
|
||||
}
|
||||
// load subscribers
|
||||
if subs, err := accountDb.GetPubSubSubscribers(); err == nil {
|
||||
ps.subscribers = subs
|
||||
}
|
||||
return ps
|
||||
}
|
||||
|
||||
func (ps *PubSub) saveSubscribers(key string) {
|
||||
if key != "" {
|
||||
if _, found := ps.subscribers[key]; !found {
|
||||
return
|
||||
}
|
||||
if err := accountingStorage.SetPubSubSubscribers(key, ps.subscribers[key]); err != nil {
|
||||
Logger.Err("<PubSub> Error saving subscribers: " + err.Error())
|
||||
}
|
||||
} else { // save all
|
||||
for key, valueMap := range ps.subscribers {
|
||||
if err := accountingStorage.SetPubSubSubscribers(key, valueMap); err != nil {
|
||||
Logger.Err("<PubSub> Error saving subscribers: " + err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *PubSub) Subscribe(si SubscribeInfo, reply *string) error {
|
||||
ps.mux.Lock()
|
||||
defer ps.mux.Unlock()
|
||||
if si.Transport != utils.META_HTTP_POST {
|
||||
*reply = "Unsupported transport type"
|
||||
return errors.New(*reply)
|
||||
}
|
||||
if ps.subscribers[si.EventName] == nil {
|
||||
ps.subscribers[si.EventName] = make(map[string]*SubscriberData)
|
||||
}
|
||||
var expTime time.Time
|
||||
if si.LifeSpan > 0 {
|
||||
expTime = time.Now().Add(si.LifeSpan)
|
||||
}
|
||||
rsr, err := utils.ParseRSRFields(si.EventFilter, utils.INFIELD_SEP)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
return err
|
||||
}
|
||||
ps.subscribers[si.EventName][utils.InfieldJoin(si.Transport, si.Address)] = &SubscriberData{
|
||||
ExpTime: expTime,
|
||||
Filters: rsr,
|
||||
}
|
||||
ps.saveSubscribers(si.EventName)
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *PubSub) Unsubscribe(si SubscribeInfo, reply *string) error {
|
||||
ps.mux.Lock()
|
||||
defer ps.mux.Unlock()
|
||||
if si.Transport != utils.META_HTTP_POST {
|
||||
*reply = "Unsupported transport type"
|
||||
return errors.New(*reply)
|
||||
}
|
||||
delete(ps.subscribers[si.EventName], utils.InfieldJoin(si.Transport, si.Address))
|
||||
ps.saveSubscribers(si.EventName)
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *PubSub) Publish(pi PublishInfo, reply *string) error {
|
||||
ps.mux.Lock()
|
||||
defer ps.mux.Unlock()
|
||||
subs := ps.subscribers[pi.Event["EventName"]]
|
||||
for transportAddress, subData := range subs {
|
||||
split := utils.InfieldSplit(transportAddress)
|
||||
if len(split) != 2 {
|
||||
Logger.Warning("<PubSub> Wrong transport;address pair: " + transportAddress)
|
||||
continue
|
||||
}
|
||||
transport := split[0]
|
||||
address := split[1]
|
||||
if !subData.ExpTime.IsZero() && subData.ExpTime.Before(time.Now()) {
|
||||
delete(subs, transportAddress)
|
||||
ps.saveSubscribers(pi.Event["EventName"])
|
||||
continue // subscription expired, do not send event
|
||||
}
|
||||
switch transport {
|
||||
case utils.META_HTTP_POST:
|
||||
go func() {
|
||||
delay := utils.Fib()
|
||||
for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort
|
||||
if _, err := ps.pubFunc(address, ps.ttlVerify, pi.Event); err == nil {
|
||||
break // Success, no need to reinterate
|
||||
} else if i == 4 { // Last iteration, syslog the warning
|
||||
Logger.Warning(fmt.Sprintf("<PubSub> Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), pi.Event["EventName"]))
|
||||
break
|
||||
}
|
||||
time.Sleep(delay())
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *PubSub) ShowSubscribers(in string, out *map[string]map[string]*SubscriberData) error {
|
||||
*out = ps.subscribers
|
||||
return nil
|
||||
}
|
||||
|
||||
type ProxyPubSub struct {
|
||||
Client *rpcclient.RpcClient
|
||||
}
|
||||
|
||||
func NewProxyPubSub(addr string, reconnects int) (*ProxyPubSub, error) {
|
||||
client, err := rpcclient.NewRpcClient("tcp", addr, reconnects, utils.GOB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ProxyPubSub{Client: client}, nil
|
||||
}
|
||||
|
||||
func (ps *ProxyPubSub) Subscribe(si SubscribeInfo, reply *string) error {
|
||||
return ps.Client.Call("PubSubV1.Subscribe", si, reply)
|
||||
}
|
||||
func (ps *ProxyPubSub) Unsubscribe(si SubscribeInfo, reply *string) error {
|
||||
return ps.Client.Call("PubSubV1.Unsubscribe", si, reply)
|
||||
}
|
||||
func (ps *ProxyPubSub) Publish(pi PublishInfo, reply *string) error {
|
||||
return ps.Client.Call("PubSubV1.Publish", pi, reply)
|
||||
}
|
||||
|
||||
func (ps *ProxyPubSub) ShowSubscribers(in string, reply *map[string]map[string]*SubscriberData) error {
|
||||
return ps.Client.Call("PubSubV1.ShowSubscribers", in, reply)
|
||||
}
|
||||
208
engine/pubsub_test.go
Normal file
208
engine/pubsub_test.go
Normal file
@@ -0,0 +1,208 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestSubscribe(t *testing.T) {
|
||||
ps := NewPubSub(accountingStorage, false)
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventName: "test",
|
||||
Transport: utils.META_HTTP_POST,
|
||||
Address: "url",
|
||||
LifeSpan: time.Second,
|
||||
}, &r); err != nil {
|
||||
t.Error("Error subscribing: ", err)
|
||||
}
|
||||
if subData, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || subData.ExpTime.IsZero() {
|
||||
t.Error("Error adding subscriber: ", ps.subscribers)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscribeSave(t *testing.T) {
|
||||
ps := NewPubSub(accountingStorage, false)
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventName: "test",
|
||||
Transport: utils.META_HTTP_POST,
|
||||
Address: "url",
|
||||
LifeSpan: time.Second,
|
||||
}, &r); err != nil {
|
||||
t.Error("Error subscribing: ", err)
|
||||
}
|
||||
subs, err := accountingStorage.GetPubSubSubscribers()
|
||||
if err != nil || len(subs["test"]) != 1 {
|
||||
t.Error("Error saving subscribers: ", err, subs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscribeNoTransport(t *testing.T) {
|
||||
ps := NewPubSub(accountingStorage, false)
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventName: "test",
|
||||
Transport: "test",
|
||||
Address: "url",
|
||||
LifeSpan: time.Second,
|
||||
}, &r); err == nil {
|
||||
t.Error("Error subscribing error: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscribeNoExpire(t *testing.T) {
|
||||
ps := NewPubSub(accountingStorage, false)
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventName: "test",
|
||||
Transport: utils.META_HTTP_POST,
|
||||
Address: "url",
|
||||
LifeSpan: 0,
|
||||
}, &r); err != nil {
|
||||
t.Error("Error subscribing: ", err)
|
||||
}
|
||||
if subData, exists := ps.subscribers["test"][utils.InfieldJoin(utils.META_HTTP_POST, "url")]; !exists || !subData.ExpTime.IsZero() {
|
||||
t.Error("Error adding no expire subscriber: ", ps.subscribers)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnsubscribe(t *testing.T) {
|
||||
ps := NewPubSub(accountingStorage, false)
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventName: "test",
|
||||
Transport: utils.META_HTTP_POST,
|
||||
Address: "url",
|
||||
LifeSpan: time.Second,
|
||||
}, &r); err != nil {
|
||||
t.Error("Error subscribing: ", err)
|
||||
}
|
||||
if err := ps.Unsubscribe(SubscribeInfo{
|
||||
EventName: "test",
|
||||
Transport: utils.META_HTTP_POST,
|
||||
Address: "url",
|
||||
}, &r); err != nil {
|
||||
t.Error("Error unsubscribing: ", err)
|
||||
}
|
||||
if _, exists := ps.subscribers["test"]["url"]; exists {
|
||||
t.Error("Error adding subscriber: ", ps.subscribers)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnsubscribeSave(t *testing.T) {
|
||||
ps := NewPubSub(accountingStorage, false)
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventName: "test",
|
||||
Transport: utils.META_HTTP_POST,
|
||||
Address: "url",
|
||||
LifeSpan: time.Second,
|
||||
}, &r); err != nil {
|
||||
t.Error("Error subscribing: ", err)
|
||||
}
|
||||
if err := ps.Unsubscribe(SubscribeInfo{
|
||||
EventName: "test",
|
||||
Transport: utils.META_HTTP_POST,
|
||||
Address: "url",
|
||||
}, &r); err != nil {
|
||||
t.Error("Error unsubscribing: ", err)
|
||||
}
|
||||
subs, err := accountingStorage.GetPubSubSubscribers()
|
||||
if err != nil || len(subs["test"]) != 0 {
|
||||
t.Error("Error saving subscribers: ", err, subs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPublish(t *testing.T) {
|
||||
ps := NewPubSub(accountingStorage, true)
|
||||
ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) {
|
||||
obj.(map[string]string)["called"] = url
|
||||
return nil, nil
|
||||
}
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventName: "test",
|
||||
Transport: utils.META_HTTP_POST,
|
||||
Address: "url",
|
||||
LifeSpan: time.Second,
|
||||
}, &r); err != nil {
|
||||
t.Error("Error subscribing: ", err)
|
||||
}
|
||||
m := make(map[string]string)
|
||||
m["EventName"] = "test"
|
||||
if err := ps.Publish(PublishInfo{
|
||||
Event: m,
|
||||
}, &r); err != nil {
|
||||
t.Error("Error publishing: ", err)
|
||||
}
|
||||
for i := 0; i < 1000; i++ { // wait for the theread to populate map
|
||||
if len(m) == 1 {
|
||||
time.Sleep(time.Microsecond)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
if r, exists := m["called"]; !exists || r != "url" {
|
||||
t.Error("Error calling publish function: ", m)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPublishExpired(t *testing.T) {
|
||||
ps := NewPubSub(accountingStorage, true)
|
||||
ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) {
|
||||
m := obj.(map[string]string)
|
||||
m["called"] = "yes"
|
||||
return nil, nil
|
||||
}
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventName: "test",
|
||||
Transport: utils.META_HTTP_POST,
|
||||
Address: "url",
|
||||
LifeSpan: 1,
|
||||
}, &r); err != nil {
|
||||
t.Error("Error subscribing: ", err)
|
||||
}
|
||||
if err := ps.Publish(PublishInfo{
|
||||
Event: map[string]string{"EventName": "test"},
|
||||
}, &r); err != nil {
|
||||
t.Error("Error publishing: ", err)
|
||||
}
|
||||
if len(ps.subscribers["test"]) != 0 {
|
||||
t.Error("Error removing expired subscribers: ", ps.subscribers)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPublishExpiredSave(t *testing.T) {
|
||||
ps := NewPubSub(accountingStorage, true)
|
||||
ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) {
|
||||
m := obj.(map[string]string)
|
||||
m["called"] = "yes"
|
||||
return nil, nil
|
||||
}
|
||||
var r string
|
||||
if err := ps.Subscribe(SubscribeInfo{
|
||||
EventName: "test",
|
||||
Transport: utils.META_HTTP_POST,
|
||||
Address: "url",
|
||||
LifeSpan: 1,
|
||||
}, &r); err != nil {
|
||||
t.Error("Error subscribing: ", err)
|
||||
}
|
||||
subs, err := accountingStorage.GetPubSubSubscribers()
|
||||
if err != nil || len(subs["test"]) != 1 {
|
||||
t.Error("Error saving subscribers: ", err, subs)
|
||||
}
|
||||
if err := ps.Publish(PublishInfo{
|
||||
Event: map[string]string{"EventName": "test"},
|
||||
}, &r); err != nil {
|
||||
t.Error("Error publishing: ", err)
|
||||
}
|
||||
subs, err = accountingStorage.GetPubSubSubscribers()
|
||||
if err != nil || len(subs["test"]) != 0 {
|
||||
t.Error("Error saving subscribers: ", err, subs)
|
||||
}
|
||||
}
|
||||
@@ -79,6 +79,8 @@ type AccountingStorage interface {
|
||||
SetAccount(*Account) error
|
||||
GetCdrStatsQueue(string) (*StatsQueue, error)
|
||||
SetCdrStatsQueue(*StatsQueue) error
|
||||
GetPubSubSubscribers() (map[string]map[string]*SubscriberData, error)
|
||||
SetPubSubSubscribers(string, map[string]*SubscriberData) error
|
||||
}
|
||||
|
||||
type CdrStorage interface {
|
||||
|
||||
@@ -561,6 +561,24 @@ func (ms *MapStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetPubSubSubscribers() (result map[string]map[string]*SubscriberData, err error) {
|
||||
result = make(map[string]map[string]*SubscriberData)
|
||||
for key, value := range ms.dict {
|
||||
if strings.HasPrefix(key, utils.PUBSUB_SUBSCRIBERS_PREFIX) {
|
||||
subs := make(map[string]*SubscriberData)
|
||||
if err = ms.ms.Unmarshal(value, &subs); err == nil {
|
||||
result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
func (ms *MapStorage) SetPubSubSubscribers(key string, subs map[string]*SubscriberData) (err error) {
|
||||
result, err := ms.ms.Marshal(subs)
|
||||
ms.dict[utils.PUBSUB_SUBSCRIBERS_PREFIX+key] = result
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetActionPlans(key string) (ats ActionPlans, err error) {
|
||||
if values, ok := ms.dict[utils.ACTION_TIMING_PREFIX+key]; ok {
|
||||
err = ms.ms.Unmarshal(values, &ats)
|
||||
|
||||
@@ -689,6 +689,30 @@ func (rs *RedisStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetPubSubSubscribers() (result map[string]map[string]*SubscriberData, err error) {
|
||||
keys, err := rs.db.Keys(utils.PUBSUB_SUBSCRIBERS_PREFIX + "*")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = make(map[string]map[string]*SubscriberData)
|
||||
for _, key := range keys {
|
||||
if values, err := rs.db.Get(key); err == nil {
|
||||
subs := make(map[string]*SubscriberData)
|
||||
err = rs.ms.Unmarshal(values, &subs)
|
||||
result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = subs
|
||||
} else {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetPubSubSubscribers(key string, subs map[string]*SubscriberData) (err error) {
|
||||
result, err := rs.ms.Marshal(subs)
|
||||
rs.db.Set(utils.PUBSUB_SUBSCRIBERS_PREFIX+key, result)
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetActionPlans(key string) (ats ActionPlans, err error) {
|
||||
var values []byte
|
||||
if values, err = rs.db.Get(utils.ACTION_TIMING_PREFIX + key); err == nil {
|
||||
|
||||
1
test.sh
1
test.sh
@@ -38,4 +38,3 @@ go test github.com/cgrates/cgrates/cdre
|
||||
cdre=$?
|
||||
|
||||
exit $v1 && $v2 && $en && $gt && $sm && $cfg && $bl && $cr && $cdrc && $ut && $hs && $c2g && $cdre
|
||||
|
||||
|
||||
@@ -165,6 +165,7 @@ const (
|
||||
LCR_PREFIX = "lcr_"
|
||||
DERIVEDCHARGERS_PREFIX = "dcs_"
|
||||
CDR_STATS_QUEUE_PREFIX = "csq_"
|
||||
PUBSUB_SUBSCRIBERS_PREFIX = "pss_"
|
||||
CDR_STATS_PREFIX = "cst_"
|
||||
TEMP_DESTINATION_PREFIX = "tmp_"
|
||||
LOG_CALL_COST_PREFIX = "cco_"
|
||||
|
||||
@@ -274,6 +274,14 @@ func AccountAliasKey(tenant, account string) string {
|
||||
return ConcatenatedKey(tenant, account)
|
||||
}
|
||||
|
||||
func InfieldJoin(vals ...string) string {
|
||||
return strings.Join(vals, INFIELD_SEP)
|
||||
}
|
||||
|
||||
func InfieldSplit(val string) []string {
|
||||
return strings.Split(val, INFIELD_SEP)
|
||||
}
|
||||
|
||||
func HttpJsonPost(url string, skipTlsVerify bool, content interface{}) ([]byte, error) {
|
||||
body, err := json.Marshal(content)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user