diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index d3a6ce206..677afb7f7 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -391,11 +391,11 @@ func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, dm *engine.DataManager, - internalRaterChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, + internalRaterChan, internalPubSubSChan, internalAttributeSChan, internalUserSChan, internalAliaseSChan, internalCdrStatSChan, internalThresholdSChan, internalStatSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { utils.Logger.Info("Starting CGRateS CDRS service.") - var ralConn, pubSubConn, usersConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn *rpcclient.RpcClientPool + var ralConn, pubSubConn, usersConn, attrSConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn *rpcclient.RpcClientPool if len(cfg.CDRSRaterConns) != 0 { // Conn pool towards RAL ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.CDRSRaterConns, internalRaterChan, cfg.InternalTtl) @@ -414,6 +414,16 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, return } } + if len(cfg.CDRSAttributeSConns) != 0 { // Users connection init + attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.CDRSAttributeSConns, internalAttributeSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", + utils.AttributeS, err.Error())) + exitChan <- true + return + } + } if len(cfg.CDRSUserSConns) != 0 { // Users connection init usersConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.CDRSUserSConns, internalUserSChan, cfg.InternalTtl) @@ -460,7 +470,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, } } cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, dm, ralConn, pubSubConn, - usersConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn) + attrSConn, usersConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn) cdrServer.SetTimeToLive(cfg.ResponseCacheTTL, nil) utils.Logger.Info("Registering CDRS HTTP Handlers.") cdrServer.RegisterHandlersToServer(server) @@ -904,8 +914,9 @@ func main() { // Start rater service if cfg.RALsEnabled { go startRater(internalRaterChan, cacheDoneChan, internalThresholdSChan, - internalCdrStatSChan, internalStatSChan, - internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, + internalCdrStatSChan, internalStatSChan, internalHistorySChan, + internalPubSubSChan, internalAttributeSChan, + internalUserSChan, internalAliaseSChan, srvManager, server, dm, loadDb, cdrDb, &stopHandled, exitChan) } @@ -917,8 +928,9 @@ func main() { // Start CDR Server if cfg.CDRSEnabled { go startCDRS(internalCdrSChan, cdrDb, dm, - internalRaterChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, - internalCdrStatSChan, internalThresholdSChan, internalStatSChan, server, exitChan) + internalRaterChan, internalPubSubSChan, internalAttributeSChan, + internalUserSChan, internalAliaseSChan, internalCdrStatSChan, + internalThresholdSChan, internalStatSChan, server, exitChan) } // Start CDR Stats server diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index 567476035..c07a481ff 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -32,8 +32,8 @@ import ( // Starts rater and reports on chan func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneChan chan struct{}, - internalThdSChan, internalCdrStatSChan, internalStatSChan, internalHistorySChan, - internalPubSubSChan, internalUserSChan, internalAliaseSChan chan rpcclient.RpcClientConnection, + internalThdSChan, internalCdrStatSChan, internalStatSChan, internalHistorySChan, internalPubSubSChan, + internalAttributeSChan, internalUserSChan, internalAliaseSChan chan rpcclient.RpcClientConnection, serviceManager *servmanager.ServiceManager, server *utils.Server, dm *engine.DataManager, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, stopHandled *bool, exitChan chan bool) { var waitTasks []chan struct{} @@ -112,7 +112,9 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC } // ToDo: Add here timings - if err := dm.LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rspIDs, resIDs, stqIDs, stqpIDs, thIDs, thpIDs, fltrIDs, sppIDs, alsPrfIDs); err != nil { + if err := dm.LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, + atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rspIDs, resIDs, stqIDs, + stqpIDs, thIDs, thpIDs, fltrIDs, sppIDs, alsPrfIDs); err != nil { utils.Logger.Crit(fmt.Sprintf(" Cache rating error: %s", err.Error())) exitChan <- true return @@ -189,7 +191,8 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, pubsubTaskChan) go func() { defer close(pubsubTaskChan) - if pubSubSConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + if pubSubSConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, + cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.RALsPubSubSConns, internalPubSubSChan, cfg.InternalTtl); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to PubSubS: %s", err.Error())) exitChan <- true @@ -200,6 +203,24 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC }() } + var attrS *rpcclient.RpcClientPool + if len(cfg.RALsAttributeSConns) != 0 { // Connections to AttributeS + attrsTaskChan := make(chan struct{}) + waitTasks = append(waitTasks, attrsTaskChan) + go func() { + defer close(attrsTaskChan) + attrS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, + cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.RALsAttributeSConns, internalAttributeSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s, error: %s", + utils.AttributeS, err.Error())) + exitChan <- true + return + } + }() + } + if len(cfg.RALsAliasSConns) != 0 { // Connection to AliasService aliasesTaskChan := make(chan struct{}) waitTasks = append(waitTasks, aliasesTaskChan) @@ -243,8 +264,11 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC if thdS != nil { engine.SetThresholdS(thdS) // temporary architectural fix until we will have separate AccountS } + if attrS != nil { + responder.AttributeS = attrS + } if cdrStats != nil { // ToDo: Fix here properly the init of stats - responder.Stats = cdrStats + responder.CdrStats = cdrStats apierRpcV1.CdrStatsSrv = cdrStats } if usersConns != nil { diff --git a/engine/cdrs.go b/engine/cdrs.go index 07be8cb5d..84276cda4 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -69,14 +69,17 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) { } } -func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, rater, pubsub, users, - aliases, cdrstats, thdS, stats rpcclient.RpcClientConnection) (*CdrServer, error) { +func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, rater, pubsub, + attrs, users, aliases, cdrstats, thdS, stats rpcclient.RpcClientConnection) (*CdrServer, error) { if rater != nil && reflect.ValueOf(rater).IsNil() { // Work around so we store actual nil instead of nil interface value, faster to check here than in CdrServer code rater = nil } if pubsub != nil && reflect.ValueOf(pubsub).IsNil() { pubsub = nil } + if attrs != nil && reflect.ValueOf(attrs).IsNil() { + attrs = nil + } if users != nil && reflect.ValueOf(users).IsNil() { users = nil } @@ -104,6 +107,7 @@ type CdrServer struct { dm *DataManager rals rpcclient.RpcClientConnection pubsub rpcclient.RpcClientConnection + attrS rpcclient.RpcClientConnection users rpcclient.RpcClientConnection aliases rpcclient.RpcClientConnection cdrstats rpcclient.RpcClientConnection diff --git a/engine/responder.go b/engine/responder.go index 1057460c1..b44c82c6d 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -47,7 +47,8 @@ type AttrGetLcr struct { type Responder struct { ExitChan chan bool - Stats rpcclient.RpcClientConnection + CdrStats rpcclient.RpcClientConnection + AttributeS rpcclient.RpcClientConnection Timeout time.Duration Timezone string MaxComputedUsage map[string]time.Duration @@ -554,7 +555,7 @@ func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error { if !rs.usageAllowed(cd.TOR, cd.GetDuration()) { return utils.ErrMaxUsageExceeded } - lcrCost, err := attrs.CallDescriptor.GetLCR(rs.Stats, attrs.LCRFilter, attrs.Paginator) + lcrCost, err := attrs.CallDescriptor.GetLCR(rs.CdrStats, attrs.LCRFilter, attrs.Paginator) if err != nil { rs.getCache().Cache(cacheKey, &cache.CacheItem{Err: err}) return err diff --git a/engine/responder_test.go b/engine/responder_test.go index 56f66754d..eb68d450b 100644 --- a/engine/responder_test.go +++ b/engine/responder_test.go @@ -181,7 +181,7 @@ func TestResponderGetSessionRuns(t *testing.T) { } func TestResponderGetLCR(t *testing.T) { - rsponder.Stats = NewStats(dm, 0) // Load stats instance + rsponder.CdrStats = NewStats(dm, 0) // Load stats instance dstDe := &Destination{Id: "GERMANY", Prefixes: []string{"+49"}} if err := dm.DataDB().SetDestination(dstDe, utils.NonTransactional); err != nil { t.Error(err) @@ -304,7 +304,7 @@ func TestResponderGetLCR(t *testing.T) { } danStatsId := "dan12_stats" var r int - rsponder.Stats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: danStatsId, Supplier: []string{"dan12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r) + rsponder.CdrStats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: danStatsId, Supplier: []string{"dan12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r) danRpfl := &RatingProfile{Id: "*out:tenant12:call:dan12", RatingPlanActivations: RatingPlanActivations{&RatingPlanActivation{ ActivationTime: time.Date(2015, 01, 01, 8, 0, 0, 0, time.UTC), @@ -314,7 +314,7 @@ func TestResponderGetLCR(t *testing.T) { }}, } rifStatsId := "rif12_stats" - rsponder.Stats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: rifStatsId, Supplier: []string{"rif12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r) + rsponder.CdrStats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: rifStatsId, Supplier: []string{"rif12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r) rifRpfl := &RatingProfile{Id: "*out:tenant12:call:rif12", RatingPlanActivations: RatingPlanActivations{&RatingPlanActivation{ ActivationTime: time.Date(2015, 01, 01, 8, 0, 0, 0, time.UTC), @@ -324,7 +324,7 @@ func TestResponderGetLCR(t *testing.T) { }}, } ivoStatsId := "ivo12_stats" - rsponder.Stats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: ivoStatsId, Supplier: []string{"ivo12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r) + rsponder.CdrStats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: ivoStatsId, Supplier: []string{"ivo12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r) ivoRpfl := &RatingProfile{Id: "*out:tenant12:call:ivo12", RatingPlanActivations: RatingPlanActivations{&RatingPlanActivation{ ActivationTime: time.Date(2015, 01, 01, 8, 0, 0, 0, time.UTC),