Various fixes

This commit is contained in:
DanB
2016-04-24 20:47:18 +02:00
parent 44f0553e92
commit bd619afdd0
17 changed files with 155 additions and 113 deletions

View File

@@ -810,7 +810,7 @@ func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.Cach
cs.DerivedChargers = cache2go.CountEntries(utils.DERIVEDCHARGERS_PREFIX)
cs.LcrProfiles = cache2go.CountEntries(utils.LCR_PREFIX)
cs.Aliases = cache2go.CountEntries(utils.ALIASES_PREFIX)
if self.CdrStatsSrv != nil && self.Config.CDRStatsEnabled {
if self.CdrStatsSrv != nil {
var queueIds []string
if err := self.CdrStatsSrv.Call("CDRStatsV1.GetQueueIds", 0, &queueIds); err != nil {
return utils.NewErrServerError(err)

View File

@@ -167,7 +167,7 @@ func TestCsvLclProcessCdrDir(t *testing.T) {
break
}
for _, cdrsConn := range cdrcCfg.CdrsConns {
if cdrsConn.Address == utils.INTERNAL {
if cdrsConn.Address == utils.MetaInternal {
cdrsConn.Address = "127.0.0.1:2013"
}
}
@@ -205,7 +205,7 @@ func TestCsvLclProcessCdr3Dir(t *testing.T) {
return
}
for _, cdrsConn := range cdrcCfg.CdrsConns {
if cdrsConn.Address == utils.INTERNAL {
if cdrsConn.Address == utils.MetaInternal {
cdrsConn.Address = "127.0.0.1:2013"
}
}

View File

@@ -292,64 +292,67 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine
internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection,
internalCdrStatSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS CDRS service.")
// Conn pool towards RAL
ralConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.CDRSRaterConns, internalRaterChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to RAL: %s", err.Error()))
exitChan <- true
return
}
// Pubsub connection init
var pubSubConn *rpcclient.RpcClientPool
if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSPubSubSConns) {
pubSubConn = ralConn
} else {
pubSubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.CDRSPubSubSConns, internalPubSubSChan, cfg.InternalTtl)
var ralConn, pubSubConn, usersConn, aliasesConn, statsConn *rpcclient.RpcClientPool
if len(cfg.CDRSRaterConns) != 0 { // Conn pool towards RAL
ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.CDRSRaterConns, internalRaterChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to PubSubSystem: %s", err.Error()))
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to RAL: %s", err.Error()))
exitChan <- true
return
}
}
// Users connection init
var usersConn *rpcclient.RpcClientPool
if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSUserSConns) {
pubSubConn = ralConn
} else {
usersConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.CDRSUserSConns, internalUserSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to UserS: %s", err.Error()))
exitChan <- true
return
if len(cfg.CDRSPubSubSConns) != 0 { // Pubsub connection init
if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSPubSubSConns) {
pubSubConn = ralConn
} else {
pubSubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.CDRSPubSubSConns, internalPubSubSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to PubSubSystem: %s", err.Error()))
exitChan <- true
return
}
}
}
// Aliases connection init
var aliasesConn *rpcclient.RpcClientPool
if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSAliaseSConns) {
pubSubConn = ralConn
} else {
aliasesConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.CDRSAliaseSConns, internalAliaseSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to AliaseS: %s", err.Error()))
exitChan <- true
return
if len(cfg.CDRSUserSConns) != 0 { // Users connection init
if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSUserSConns) {
usersConn = ralConn
} else {
usersConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.CDRSUserSConns, internalUserSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to UserS: %s", err.Error()))
exitChan <- true
return
}
}
}
// Stats connection init
var statsConn *rpcclient.RpcClientPool
if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSStatSConns) {
pubSubConn = ralConn
} else {
statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.CDRSStatSConns, internalCdrStatSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to StatS: %s", err.Error()))
exitChan <- true
return
if len(cfg.CDRSAliaseSConns) != 0 { // Aliases connection init
if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSAliaseSConns) {
aliasesConn = ralConn
} else {
aliasesConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.CDRSAliaseSConns, internalAliaseSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to AliaseS: %s", err.Error()))
exitChan <- true
return
}
}
}
if len(cfg.CDRSStatSConns) != 0 { // Stats connection init
if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSStatSConns) {
statsConn = ralConn
} else {
statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.CDRSStatSConns, internalCdrStatSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to StatS: %s", err.Error()))
exitChan <- true
return
}
}
}
cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, ralConn, pubSubConn, usersConn, aliasesConn, statsConn)

View File

@@ -92,7 +92,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, balTaskChan)
go func() {
defer close(balTaskChan)
if cfg.RALsBalancer == utils.INTERNAL {
if cfg.RALsBalancer == utils.MetaInternal {
select {
case bal = <-internalBalancerChan:
internalBalancerChan <- bal // Put it back if someone else is interested about
@@ -193,12 +193,16 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
for _, chn := range waitTasks {
<-chn
}
responder := &engine.Responder{Bal: bal, ExitChan: exitChan, Stats: cdrStats}
responder := &engine.Responder{Bal: bal, ExitChan: exitChan}
responder.SetTimeToLive(cfg.ResponseCacheTTL, nil)
apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Sched: sched,
Config: cfg, Responder: responder, CdrStatsSrv: cdrStats, Users: usersConns}
Config: cfg, Responder: responder, Users: usersConns}
apierRpcV2 := &v2.ApierV2{
ApierV1: *apierRpcV1}
if cdrStats != nil { // ToDo: Fix here properly the init of stats
responder.Stats = cdrStats
apierRpcV1.CdrStatsSrv = cdrStats
}
// internalSchedulerChan shared here
server.RpcRegister(responder)
server.RpcRegister(apierRpcV1)

View File

@@ -38,4 +38,9 @@
}
},
"cdrstats": {
"enabled": true, // starts the cdrstats service: <true|false>
"save_interval": "0s", // interval to save changed stats into dataDb storage
},
}

View File

@@ -5,31 +5,31 @@
// Starts rater, cdrs and mediator connecting over internal channel
"listen": {
"rpc_json": ":2012", // RPC JSON listening address
"rpc_gob": ":2013", // RPC GOB listening address
"http": ":2080", // HTTP listening address
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"rals": {
"enabled": true, // enable Rater service: <true|false>
"enabled": true,
"cdrstats_conns": [
{"address": "*internal"}
], // address where to reach the cdrstats service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
],
},
"cdrs": {
"enabled": true, // start the CDR Server service: <true|false>
"store_cdrs": false, // store cdrs in storDb
"enabled": true,
"store_cdrs": false,
"rals_conns": [
{"address": "*internal"} // address where to reach the Rater for cost calculation, empty to disable functionality: <""|*internal|x.y.z.y:1234>
{"address": "*internal"}
],
"cdrstats_conns": [
{"address": "*internal"}
], // address where to reach the cdrstats service. Empty to disable stats gathering out of mediated CDRs <""|internal|x.y.z.y:1234>
]
},
"cdrstats": {
"enabled": true, // starts the cdrstats service: <true|false>
"enabled": true,
"save_interval": "1s",
},

View File

@@ -1,32 +1,40 @@
{
"listen": {
"rpc_json": ":2014", // RPC JSON listening address
"rpc_gob": ":2015", // RPC GOB listening address
"http": ":2081", // HTTP listening address
"rpc_json": ":2014",
"rpc_gob": ":2015",
"http": ":2081",
},
"rater": {
"enabled": true, // enable Rater service: <true|false>
"cdrstats": "internal", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234>
"pubsubs": "internal", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234>
"users": "internal", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234>
"aliases": "internal",
"rals": {
"enabled": true,
"cdrstats_conns": [
{"address": "*internal"}
],
"pubsubs_conns": [
{"address": "*internal"}
],
"users_conns": [
{"address": "*internal"}
],
"aliases_conns": [
{"address": "*internal"}
],
},
"scheduler": {
"enabled": true, // start Scheduler service: <true|false>
"enabled": true,
},
"cdrstats": {
"enabled": true, // starts the cdrstats service: <true|false>
"enabled": true,
},
"pubsubs": {
"enabled": true, // starts PubSub service: <true|false>.
"enabled": true,
},
"aliases": {
"enabled": true, // starts Aliases service: <true|false>.
"enabled": true,
},
"users": {

View File

@@ -1,33 +1,41 @@
{
"listen": {
"rpc_json": ":2016", // RPC JSON listening address
"rpc_gob": ":2017", // RPC GOB listening address
"http": ":2082", // HTTP listening address
"rpc_json": ":2016",
"rpc_gob": ":2017",
"http": ":2082",
},
"rater": {
"enabled": true, // enable Rater service: <true|false>
"cdrstats": "internal", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234>
"pubsubs": "internal", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234>
"users": "internal", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234>
"aliases": "internal",
"rals": {
"enabled": true,
"cdrstats_conns": [
{"address": "*internal"}
],
"pubsubs_conns": [
{"address": "*internal"}
],
"users_conns": [
{"address": "*internal"}
],
"aliases_conns": [
{"address": "*internal"}
],
},
"scheduler": {
"enabled": true, // start Scheduler service: <true|false>
"enabled": true,
},
"cdrstats": {
"enabled": true, // starts the cdrstats service: <true|false>
"enabled": true,
},
"pubsubs": {
"enabled": true, // starts PubSub service: <true|false>.
"enabled": true,
},
"aliases": {
"enabled": true, // starts Aliases service: <true|false>.
"enabled": true,
},
"users": {

View File

@@ -7,21 +7,21 @@
"cdrs": {
"enabled": true, // start the CDR Server service: <true|false>
"rater_conns": [
{"server": "127.0.0.1:2014"},
{"server": "127.0.0.1:2016"}
"rals_conns": [
{"address": "127.0.0.1:2014"},
{"address": "127.0.0.1:2016"}
],
"cdrstats": "", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234>
},
"sm_generic": {
"enabled": true,
"rater_conns": [
{"server": "127.0.0.1:2014"},
{"server": "127.0.0.1:2016"}
"rals_conns": [
{"address": "127.0.0.1:2014"},
{"address": "127.0.0.1:2016"}
],
"cdrs_conns": [
{"server": "internal"} // address where to reach CDR Server, empty to disable CDR capturing <internal|x.y.z.y:1234>
{"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <internal|x.y.z.y:1234>
],
},
}

View File

@@ -7,21 +7,21 @@
"cdrs": {
"enabled": true, // start the CDR Server service: <true|false>
"rater_conns": [
{"server": "127.0.0.1:2014"},
{"server": "127.0.0.1:2016"}
"rals_conns": [
{"address": "127.0.0.1:2014"},
{"address": "127.0.0.1:2016"}
],
"cdrstats": "", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234>
},
"sm_generic": {
"enabled": true,
"rater_conns": [
{"server": "127.0.0.1:2014"},
{"server": "127.0.0.1:2016"}
"rals_conns": [
{"address": "127.0.0.1:2014"},
{"address": "127.0.0.1:2016"}
],
"cdrs_conns": [
{"server": "internal"} // address where to reach CDR Server, empty to disable CDR capturing <internal|x.y.z.y:1234>
{"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <internal|x.y.z.y:1234>
],
},

View File

@@ -3,8 +3,8 @@
"enabled": true,
"listen": "127.0.0.1:3868",
"sm_generic_conns": [
{"server": "127.0.0.1:2018"},
{"server": "127.0.0.1:2020"}
{"address": "127.0.0.1:2018"},
{"address": "127.0.0.1:2020"}
],
},
}

View File

@@ -670,7 +670,7 @@ func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Acti
return err
}
var client rpcclient.RpcClientConnection
if req.Address != utils.INTERNAL {
if req.Address != utils.MetaInternal {
if client, err = rpcclient.NewRpcClient(req.Method, req.Address, req.Attempts, 0, req.Transport, nil); err != nil {
return err
}

View File

@@ -2207,7 +2207,7 @@ func TestCgrRpcAction(t *testing.T) {
trpcp := &TestRPCParameters{}
utils.RegisterRpcParams("", trpcp)
a := &Action{
ExtraParameters: `{"Address": "internal",
ExtraParameters: `{"Address": "*internal",
"Transport": "*gob",
"Method": "TestRPCParameters.Hopa",
"Attempts":1,

View File

@@ -71,6 +71,21 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
}
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater, pubsub, users, aliases, stats rpcclient.RpcClientConnection) (*CdrServer, error) {
if rater == nil || reflect.ValueOf(rater).IsNil() { // Work around so we store actual nil instead of nil interface value
rater = nil
}
if pubsub == nil || reflect.ValueOf(pubsub).IsNil() {
pubsub = nil
}
if users == nil || reflect.ValueOf(users).IsNil() {
users = nil
}
if aliases == nil || reflect.ValueOf(aliases).IsNil() {
aliases = nil
}
if stats == nil || reflect.ValueOf(stats).IsNil() {
stats = nil
}
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, client: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: Guardian}, nil
}

View File

@@ -36,7 +36,7 @@ func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, codec str
if rpcConnCfg.Address == utils.MetaInternal {
var internalConn rpcclient.RpcClientConnection
select {
case internalConn := <-internalConnChan:
case internalConn = <-internalConnChan:
internalConnChan <- internalConn
case <-time.After(ttl):
return nil, errors.New("TTL triggered")

View File

@@ -93,7 +93,6 @@ func (rs *Responder) GetCost(arg *CallDescriptor, reply *CallCost) (err error) {
}, arg, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
return err
}
if rs.Bal != nil {
r, e := rs.getCallCost(arg, "Responder.GetCost")
*reply, err = *r, e
@@ -497,7 +496,7 @@ func (rs *Responder) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs *u
}
func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error {
cacheKey := "GetLCR" + attrs.CgrID
cacheKey := utils.LCRCachePrefix + attrs.CgrID + attrs.RunID
if attrs.CallDescriptor.Subject == "" {
attrs.CallDescriptor.Subject = attrs.CallDescriptor.Account
}

View File

@@ -143,7 +143,6 @@ const (
FWV = "fwv"
DRYRUN = "dry_run"
META_COMBIMED = "*combimed"
INTERNAL = "internal"
MetaInternal = "*internal"
ZERO_RATING_SUBJECT_PREFIX = "*zero"
OK = "OK"
@@ -243,6 +242,7 @@ const (
REFUND_ROUND_CACHE_PREFIX = "REFUND_ROUND_"
GET_SESS_RUNS_CACHE_PREFIX = "GET_SESS_RUNS_"
LOG_CALL_COST_CACHE_PREFIX = "LOG_CALL_COSTS_"
LCRCachePrefix = "LCR_"
ALIAS_CONTEXT_RATING = "*rating"
NOT_AVAILABLE = "N/A"
CALL = "call"