Adding attributes_conns in rals and cdrs constructors

This commit is contained in:
DanB
2017-12-11 18:24:57 +01:00
parent dbedf7b5ab
commit 61f2cfaf5f
5 changed files with 61 additions and 20 deletions

View File

@@ -391,11 +391,11 @@ func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien
func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
cdrDb engine.CdrStorage, dm *engine.DataManager,
internalRaterChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
internalRaterChan, internalPubSubSChan, internalAttributeSChan, internalUserSChan, internalAliaseSChan,
internalCdrStatSChan, internalThresholdSChan, internalStatSChan chan rpcclient.RpcClientConnection,
server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS CDRS service.")
var ralConn, pubSubConn, usersConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn *rpcclient.RpcClientPool
var ralConn, pubSubConn, usersConn, attrSConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn *rpcclient.RpcClientPool
if len(cfg.CDRSRaterConns) != 0 { // Conn pool towards RAL
ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.CDRSRaterConns, internalRaterChan, cfg.InternalTtl)
@@ -414,6 +414,16 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
return
}
}
if len(cfg.CDRSAttributeSConns) != 0 { // Users connection init
attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.CDRSAttributeSConns, internalAttributeSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
utils.AttributeS, err.Error()))
exitChan <- true
return
}
}
if len(cfg.CDRSUserSConns) != 0 { // Users connection init
usersConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.CDRSUserSConns, internalUserSChan, cfg.InternalTtl)
@@ -460,7 +470,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
}
}
cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, dm, ralConn, pubSubConn,
usersConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn)
attrSConn, usersConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn)
cdrServer.SetTimeToLive(cfg.ResponseCacheTTL, nil)
utils.Logger.Info("Registering CDRS HTTP Handlers.")
cdrServer.RegisterHandlersToServer(server)
@@ -904,8 +914,9 @@ func main() {
// Start rater service
if cfg.RALsEnabled {
go startRater(internalRaterChan, cacheDoneChan, internalThresholdSChan,
internalCdrStatSChan, internalStatSChan,
internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
internalCdrStatSChan, internalStatSChan, internalHistorySChan,
internalPubSubSChan, internalAttributeSChan,
internalUserSChan, internalAliaseSChan,
srvManager, server, dm, loadDb, cdrDb, &stopHandled, exitChan)
}
@@ -917,8 +928,9 @@ func main() {
// Start CDR Server
if cfg.CDRSEnabled {
go startCDRS(internalCdrSChan, cdrDb, dm,
internalRaterChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
internalCdrStatSChan, internalThresholdSChan, internalStatSChan, server, exitChan)
internalRaterChan, internalPubSubSChan, internalAttributeSChan,
internalUserSChan, internalAliaseSChan, internalCdrStatSChan,
internalThresholdSChan, internalStatSChan, server, exitChan)
}
// Start CDR Stats server

View File

@@ -32,8 +32,8 @@ import (
// Starts rater and reports on chan
func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneChan chan struct{},
internalThdSChan, internalCdrStatSChan, internalStatSChan, internalHistorySChan,
internalPubSubSChan, internalUserSChan, internalAliaseSChan chan rpcclient.RpcClientConnection,
internalThdSChan, internalCdrStatSChan, internalStatSChan, internalHistorySChan, internalPubSubSChan,
internalAttributeSChan, internalUserSChan, internalAliaseSChan chan rpcclient.RpcClientConnection,
serviceManager *servmanager.ServiceManager, server *utils.Server,
dm *engine.DataManager, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, stopHandled *bool, exitChan chan bool) {
var waitTasks []chan struct{}
@@ -112,7 +112,9 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
}
// ToDo: Add here timings
if err := dm.LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rspIDs, resIDs, stqIDs, stqpIDs, thIDs, thpIDs, fltrIDs, sppIDs, alsPrfIDs); err != nil {
if err := dm.LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs,
atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rspIDs, resIDs, stqIDs,
stqpIDs, thIDs, thpIDs, fltrIDs, sppIDs, alsPrfIDs); err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Cache rating error: %s", err.Error()))
exitChan <- true
return
@@ -189,7 +191,8 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, pubsubTaskChan)
go func() {
defer close(pubsubTaskChan)
if pubSubSConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
if pubSubSConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.RALsPubSubSConns, internalPubSubSChan, cfg.InternalTtl); err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect to PubSubS: %s", err.Error()))
exitChan <- true
@@ -200,6 +203,24 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
}()
}
var attrS *rpcclient.RpcClientPool
if len(cfg.RALsAttributeSConns) != 0 { // Connections to AttributeS
attrsTaskChan := make(chan struct{})
waitTasks = append(waitTasks, attrsTaskChan)
go func() {
defer close(attrsTaskChan)
attrS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts,
cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.RALsAttributeSConns, internalAttributeSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect to %s, error: %s",
utils.AttributeS, err.Error()))
exitChan <- true
return
}
}()
}
if len(cfg.RALsAliasSConns) != 0 { // Connection to AliasService
aliasesTaskChan := make(chan struct{})
waitTasks = append(waitTasks, aliasesTaskChan)
@@ -243,8 +264,11 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
if thdS != nil {
engine.SetThresholdS(thdS) // temporary architectural fix until we will have separate AccountS
}
if attrS != nil {
responder.AttributeS = attrS
}
if cdrStats != nil { // ToDo: Fix here properly the init of stats
responder.Stats = cdrStats
responder.CdrStats = cdrStats
apierRpcV1.CdrStatsSrv = cdrStats
}
if usersConns != nil {

View File

@@ -69,14 +69,17 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
}
}
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, rater, pubsub, users,
aliases, cdrstats, thdS, stats rpcclient.RpcClientConnection) (*CdrServer, error) {
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, rater, pubsub,
attrs, users, aliases, cdrstats, thdS, stats rpcclient.RpcClientConnection) (*CdrServer, error) {
if rater != nil && reflect.ValueOf(rater).IsNil() { // Work around so we store actual nil instead of nil interface value, faster to check here than in CdrServer code
rater = nil
}
if pubsub != nil && reflect.ValueOf(pubsub).IsNil() {
pubsub = nil
}
if attrs != nil && reflect.ValueOf(attrs).IsNil() {
attrs = nil
}
if users != nil && reflect.ValueOf(users).IsNil() {
users = nil
}
@@ -104,6 +107,7 @@ type CdrServer struct {
dm *DataManager
rals rpcclient.RpcClientConnection
pubsub rpcclient.RpcClientConnection
attrS rpcclient.RpcClientConnection
users rpcclient.RpcClientConnection
aliases rpcclient.RpcClientConnection
cdrstats rpcclient.RpcClientConnection

View File

@@ -47,7 +47,8 @@ type AttrGetLcr struct {
type Responder struct {
ExitChan chan bool
Stats rpcclient.RpcClientConnection
CdrStats rpcclient.RpcClientConnection
AttributeS rpcclient.RpcClientConnection
Timeout time.Duration
Timezone string
MaxComputedUsage map[string]time.Duration
@@ -554,7 +555,7 @@ func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error {
if !rs.usageAllowed(cd.TOR, cd.GetDuration()) {
return utils.ErrMaxUsageExceeded
}
lcrCost, err := attrs.CallDescriptor.GetLCR(rs.Stats, attrs.LCRFilter, attrs.Paginator)
lcrCost, err := attrs.CallDescriptor.GetLCR(rs.CdrStats, attrs.LCRFilter, attrs.Paginator)
if err != nil {
rs.getCache().Cache(cacheKey, &cache.CacheItem{Err: err})
return err

View File

@@ -181,7 +181,7 @@ func TestResponderGetSessionRuns(t *testing.T) {
}
func TestResponderGetLCR(t *testing.T) {
rsponder.Stats = NewStats(dm, 0) // Load stats instance
rsponder.CdrStats = NewStats(dm, 0) // Load stats instance
dstDe := &Destination{Id: "GERMANY", Prefixes: []string{"+49"}}
if err := dm.DataDB().SetDestination(dstDe, utils.NonTransactional); err != nil {
t.Error(err)
@@ -304,7 +304,7 @@ func TestResponderGetLCR(t *testing.T) {
}
danStatsId := "dan12_stats"
var r int
rsponder.Stats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: danStatsId, Supplier: []string{"dan12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r)
rsponder.CdrStats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: danStatsId, Supplier: []string{"dan12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r)
danRpfl := &RatingProfile{Id: "*out:tenant12:call:dan12",
RatingPlanActivations: RatingPlanActivations{&RatingPlanActivation{
ActivationTime: time.Date(2015, 01, 01, 8, 0, 0, 0, time.UTC),
@@ -314,7 +314,7 @@ func TestResponderGetLCR(t *testing.T) {
}},
}
rifStatsId := "rif12_stats"
rsponder.Stats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: rifStatsId, Supplier: []string{"rif12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r)
rsponder.CdrStats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: rifStatsId, Supplier: []string{"rif12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r)
rifRpfl := &RatingProfile{Id: "*out:tenant12:call:rif12",
RatingPlanActivations: RatingPlanActivations{&RatingPlanActivation{
ActivationTime: time.Date(2015, 01, 01, 8, 0, 0, 0, time.UTC),
@@ -324,7 +324,7 @@ func TestResponderGetLCR(t *testing.T) {
}},
}
ivoStatsId := "ivo12_stats"
rsponder.Stats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: ivoStatsId, Supplier: []string{"ivo12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r)
rsponder.CdrStats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: ivoStatsId, Supplier: []string{"ivo12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r)
ivoRpfl := &RatingProfile{Id: "*out:tenant12:call:ivo12",
RatingPlanActivations: RatingPlanActivations{&RatingPlanActivation{
ActivationTime: time.Date(2015, 01, 01, 8, 0, 0, 0, time.UTC),