diff --git a/apier/v1/apier.go b/apier/v1/apier.go index e6e62711f..95fba7081 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -810,7 +810,7 @@ func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.Cach cs.DerivedChargers = cache2go.CountEntries(utils.DERIVEDCHARGERS_PREFIX) cs.LcrProfiles = cache2go.CountEntries(utils.LCR_PREFIX) cs.Aliases = cache2go.CountEntries(utils.ALIASES_PREFIX) - if self.CdrStatsSrv != nil && self.Config.CDRStatsEnabled { + if self.CdrStatsSrv != nil { var queueIds []string if err := self.CdrStatsSrv.Call("CDRStatsV1.GetQueueIds", 0, &queueIds); err != nil { return utils.NewErrServerError(err) diff --git a/cdrc/cdrc_local_test.go b/cdrc/cdrc_local_test.go index be4668599..3fd2f31f7 100644 --- a/cdrc/cdrc_local_test.go +++ b/cdrc/cdrc_local_test.go @@ -167,7 +167,7 @@ func TestCsvLclProcessCdrDir(t *testing.T) { break } for _, cdrsConn := range cdrcCfg.CdrsConns { - if cdrsConn.Address == utils.INTERNAL { + if cdrsConn.Address == utils.MetaInternal { cdrsConn.Address = "127.0.0.1:2013" } } @@ -205,7 +205,7 @@ func TestCsvLclProcessCdr3Dir(t *testing.T) { return } for _, cdrsConn := range cdrcCfg.CdrsConns { - if cdrsConn.Address == utils.INTERNAL { + if cdrsConn.Address == utils.MetaInternal { cdrsConn.Address = "127.0.0.1:2013" } } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 00b983106..6c5f3eb04 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -292,64 +292,67 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection, internalCdrStatSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { utils.Logger.Info("Starting CGRateS CDRS service.") - // Conn pool towards RAL - ralConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.CDRSRaterConns, internalRaterChan, cfg.InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) - exitChan <- true - return - } - // Pubsub connection init - var pubSubConn *rpcclient.RpcClientPool - if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSPubSubSConns) { - pubSubConn = ralConn - } else { - pubSubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.CDRSPubSubSConns, internalPubSubSChan, cfg.InternalTtl) + var ralConn, pubSubConn, usersConn, aliasesConn, statsConn *rpcclient.RpcClientPool + if len(cfg.CDRSRaterConns) != 0 { // Conn pool towards RAL + ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.CDRSRaterConns, internalRaterChan, cfg.InternalTtl) if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to PubSubSystem: %s", err.Error())) + utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) exitChan <- true return } } - // Users connection init - var usersConn *rpcclient.RpcClientPool - if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSUserSConns) { - pubSubConn = ralConn - } else { - usersConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.CDRSUserSConns, internalUserSChan, cfg.InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to UserS: %s", err.Error())) - exitChan <- true - return + if len(cfg.CDRSPubSubSConns) != 0 { // Pubsub connection init + if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSPubSubSConns) { + pubSubConn = ralConn + } else { + pubSubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.CDRSPubSubSConns, internalPubSubSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to PubSubSystem: %s", err.Error())) + exitChan <- true + return + } } } - // Aliases connection init - var aliasesConn *rpcclient.RpcClientPool - if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSAliaseSConns) { - pubSubConn = ralConn - } else { - aliasesConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.CDRSAliaseSConns, internalAliaseSChan, cfg.InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to AliaseS: %s", err.Error())) - exitChan <- true - return + if len(cfg.CDRSUserSConns) != 0 { // Users connection init + if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSUserSConns) { + usersConn = ralConn + } else { + usersConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.CDRSUserSConns, internalUserSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to UserS: %s", err.Error())) + exitChan <- true + return + } } } - // Stats connection init - var statsConn *rpcclient.RpcClientPool - if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSStatSConns) { - pubSubConn = ralConn - } else { - statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.CDRSStatSConns, internalCdrStatSChan, cfg.InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to StatS: %s", err.Error())) - exitChan <- true - return + if len(cfg.CDRSAliaseSConns) != 0 { // Aliases connection init + if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSAliaseSConns) { + aliasesConn = ralConn + } else { + aliasesConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.CDRSAliaseSConns, internalAliaseSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to AliaseS: %s", err.Error())) + exitChan <- true + return + } + } + } + + if len(cfg.CDRSStatSConns) != 0 { // Stats connection init + if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSStatSConns) { + statsConn = ralConn + } else { + statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.CDRSStatSConns, internalCdrStatSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to StatS: %s", err.Error())) + exitChan <- true + return + } } } cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, ralConn, pubSubConn, usersConn, aliasesConn, statsConn) diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index 1647ee62a..9613256de 100644 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -92,7 +92,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, balTaskChan) go func() { defer close(balTaskChan) - if cfg.RALsBalancer == utils.INTERNAL { + if cfg.RALsBalancer == utils.MetaInternal { select { case bal = <-internalBalancerChan: internalBalancerChan <- bal // Put it back if someone else is interested about @@ -193,12 +193,16 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC for _, chn := range waitTasks { <-chn } - responder := &engine.Responder{Bal: bal, ExitChan: exitChan, Stats: cdrStats} + responder := &engine.Responder{Bal: bal, ExitChan: exitChan} responder.SetTimeToLive(cfg.ResponseCacheTTL, nil) apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Sched: sched, - Config: cfg, Responder: responder, CdrStatsSrv: cdrStats, Users: usersConns} + Config: cfg, Responder: responder, Users: usersConns} apierRpcV2 := &v2.ApierV2{ ApierV1: *apierRpcV1} + if cdrStats != nil { // ToDo: Fix here properly the init of stats + responder.Stats = cdrStats + apierRpcV1.CdrStatsSrv = cdrStats + } // internalSchedulerChan shared here server.RpcRegister(responder) server.RpcRegister(apierRpcV1) diff --git a/data/conf/samples/apier/apier.json b/data/conf/samples/apier/apier.json index 30a533b19..8234d67b0 100644 --- a/data/conf/samples/apier/apier.json +++ b/data/conf/samples/apier/apier.json @@ -38,4 +38,9 @@ } }, +"cdrstats": { + "enabled": true, // starts the cdrstats service: + "save_interval": "0s", // interval to save changed stats into dataDb storage +}, + } diff --git a/data/conf/samples/cdrstats/cdrstats.json b/data/conf/samples/cdrstats/cdrstats.json index 5e47affa5..fc4d074b0 100644 --- a/data/conf/samples/cdrstats/cdrstats.json +++ b/data/conf/samples/cdrstats/cdrstats.json @@ -5,31 +5,31 @@ // Starts rater, cdrs and mediator connecting over internal channel "listen": { - "rpc_json": ":2012", // RPC JSON listening address - "rpc_gob": ":2013", // RPC GOB listening address - "http": ":2080", // HTTP listening address + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080", }, "rals": { - "enabled": true, // enable Rater service: + "enabled": true, "cdrstats_conns": [ {"address": "*internal"} - ], // address where to reach the cdrstats service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> + ], }, "cdrs": { - "enabled": true, // start the CDR Server service: - "store_cdrs": false, // store cdrs in storDb + "enabled": true, + "store_cdrs": false, "rals_conns": [ - {"address": "*internal"} // address where to reach the Rater for cost calculation, empty to disable functionality: <""|*internal|x.y.z.y:1234> + {"address": "*internal"} ], "cdrstats_conns": [ {"address": "*internal"} - ], // address where to reach the cdrstats service. Empty to disable stats gathering out of mediated CDRs <""|internal|x.y.z.y:1234> + ] }, "cdrstats": { - "enabled": true, // starts the cdrstats service: + "enabled": true, "save_interval": "1s", }, diff --git a/data/conf/samples/hapool/cgrrater1/cgr.json b/data/conf/samples/hapool/cgrrater1/cgr.json index cfde7c390..d33b19f7b 100644 --- a/data/conf/samples/hapool/cgrrater1/cgr.json +++ b/data/conf/samples/hapool/cgrrater1/cgr.json @@ -1,32 +1,40 @@ { "listen": { - "rpc_json": ":2014", // RPC JSON listening address - "rpc_gob": ":2015", // RPC GOB listening address - "http": ":2081", // HTTP listening address + "rpc_json": ":2014", + "rpc_gob": ":2015", + "http": ":2081", }, -"rater": { - "enabled": true, // enable Rater service: - "cdrstats": "internal", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234> - "pubsubs": "internal", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234> - "users": "internal", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234> - "aliases": "internal", +"rals": { + "enabled": true, + "cdrstats_conns": [ + {"address": "*internal"} + ], + "pubsubs_conns": [ + {"address": "*internal"} + ], + "users_conns": [ + {"address": "*internal"} + ], + "aliases_conns": [ + {"address": "*internal"} + ], }, "scheduler": { - "enabled": true, // start Scheduler service: + "enabled": true, }, "cdrstats": { - "enabled": true, // starts the cdrstats service: + "enabled": true, }, "pubsubs": { - "enabled": true, // starts PubSub service: . + "enabled": true, }, "aliases": { - "enabled": true, // starts Aliases service: . + "enabled": true, }, "users": { diff --git a/data/conf/samples/hapool/cgrrater2/cgr.json b/data/conf/samples/hapool/cgrrater2/cgr.json index dd64d4442..604f5b9ee 100644 --- a/data/conf/samples/hapool/cgrrater2/cgr.json +++ b/data/conf/samples/hapool/cgrrater2/cgr.json @@ -1,33 +1,41 @@ { "listen": { - "rpc_json": ":2016", // RPC JSON listening address - "rpc_gob": ":2017", // RPC GOB listening address - "http": ":2082", // HTTP listening address + "rpc_json": ":2016", + "rpc_gob": ":2017", + "http": ":2082", }, -"rater": { - "enabled": true, // enable Rater service: - "cdrstats": "internal", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234> - "pubsubs": "internal", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234> - "users": "internal", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234> - "aliases": "internal", +"rals": { + "enabled": true, + "cdrstats_conns": [ + {"address": "*internal"} + ], + "pubsubs_conns": [ + {"address": "*internal"} + ], + "users_conns": [ + {"address": "*internal"} + ], + "aliases_conns": [ + {"address": "*internal"} + ], }, "scheduler": { - "enabled": true, // start Scheduler service: + "enabled": true, }, "cdrstats": { - "enabled": true, // starts the cdrstats service: + "enabled": true, }, "pubsubs": { - "enabled": true, // starts PubSub service: . + "enabled": true, }, "aliases": { - "enabled": true, // starts Aliases service: . + "enabled": true, }, "users": { diff --git a/data/conf/samples/hapool/cgrsmg1/cgr.json b/data/conf/samples/hapool/cgrsmg1/cgr.json index 9b8ec46b1..0473fb2bc 100644 --- a/data/conf/samples/hapool/cgrsmg1/cgr.json +++ b/data/conf/samples/hapool/cgrsmg1/cgr.json @@ -7,21 +7,21 @@ "cdrs": { "enabled": true, // start the CDR Server service: - "rater_conns": [ - {"server": "127.0.0.1:2014"}, - {"server": "127.0.0.1:2016"} + "rals_conns": [ + {"address": "127.0.0.1:2014"}, + {"address": "127.0.0.1:2016"} ], "cdrstats": "", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234> }, "sm_generic": { "enabled": true, - "rater_conns": [ - {"server": "127.0.0.1:2014"}, - {"server": "127.0.0.1:2016"} + "rals_conns": [ + {"address": "127.0.0.1:2014"}, + {"address": "127.0.0.1:2016"} ], "cdrs_conns": [ - {"server": "internal"} // address where to reach CDR Server, empty to disable CDR capturing + {"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing ], }, } diff --git a/data/conf/samples/hapool/cgrsmg2/cgr.json b/data/conf/samples/hapool/cgrsmg2/cgr.json index c61812057..645a8ccf3 100644 --- a/data/conf/samples/hapool/cgrsmg2/cgr.json +++ b/data/conf/samples/hapool/cgrsmg2/cgr.json @@ -7,21 +7,21 @@ "cdrs": { "enabled": true, // start the CDR Server service: - "rater_conns": [ - {"server": "127.0.0.1:2014"}, - {"server": "127.0.0.1:2016"} + "rals_conns": [ + {"address": "127.0.0.1:2014"}, + {"address": "127.0.0.1:2016"} ], "cdrstats": "", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234> }, "sm_generic": { "enabled": true, - "rater_conns": [ - {"server": "127.0.0.1:2014"}, - {"server": "127.0.0.1:2016"} + "rals_conns": [ + {"address": "127.0.0.1:2014"}, + {"address": "127.0.0.1:2016"} ], "cdrs_conns": [ - {"server": "internal"} // address where to reach CDR Server, empty to disable CDR capturing + {"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing ], }, diff --git a/data/conf/samples/hapool/dagent/cgr.json b/data/conf/samples/hapool/dagent/cgr.json index 2f02d5669..bf06a164c 100644 --- a/data/conf/samples/hapool/dagent/cgr.json +++ b/data/conf/samples/hapool/dagent/cgr.json @@ -3,8 +3,8 @@ "enabled": true, "listen": "127.0.0.1:3868", "sm_generic_conns": [ - {"server": "127.0.0.1:2018"}, - {"server": "127.0.0.1:2020"} + {"address": "127.0.0.1:2018"}, + {"address": "127.0.0.1:2020"} ], }, } diff --git a/engine/action.go b/engine/action.go index 84669ff1f..358f87dc0 100644 --- a/engine/action.go +++ b/engine/action.go @@ -670,7 +670,7 @@ func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Acti return err } var client rpcclient.RpcClientConnection - if req.Address != utils.INTERNAL { + if req.Address != utils.MetaInternal { if client, err = rpcclient.NewRpcClient(req.Method, req.Address, req.Attempts, 0, req.Transport, nil); err != nil { return err } diff --git a/engine/actions_test.go b/engine/actions_test.go index f9297d25a..14c3dd104 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -2207,7 +2207,7 @@ func TestCgrRpcAction(t *testing.T) { trpcp := &TestRPCParameters{} utils.RegisterRpcParams("", trpcp) a := &Action{ - ExtraParameters: `{"Address": "internal", + ExtraParameters: `{"Address": "*internal", "Transport": "*gob", "Method": "TestRPCParameters.Hopa", "Attempts":1, diff --git a/engine/cdrs.go b/engine/cdrs.go index c31badcd3..7c00dd2e5 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -71,6 +71,21 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) { } func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater, pubsub, users, aliases, stats rpcclient.RpcClientConnection) (*CdrServer, error) { + if rater == nil || reflect.ValueOf(rater).IsNil() { // Work around so we store actual nil instead of nil interface value + rater = nil + } + if pubsub == nil || reflect.ValueOf(pubsub).IsNil() { + pubsub = nil + } + if users == nil || reflect.ValueOf(users).IsNil() { + users = nil + } + if aliases == nil || reflect.ValueOf(aliases).IsNil() { + aliases = nil + } + if stats == nil || reflect.ValueOf(stats).IsNil() { + stats = nil + } return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, client: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: Guardian}, nil } diff --git a/engine/libengine.go b/engine/libengine.go index d541de76d..b555f596a 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -36,7 +36,7 @@ func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, codec str if rpcConnCfg.Address == utils.MetaInternal { var internalConn rpcclient.RpcClientConnection select { - case internalConn := <-internalConnChan: + case internalConn = <-internalConnChan: internalConnChan <- internalConn case <-time.After(ttl): return nil, errors.New("TTL triggered") diff --git a/engine/responder.go b/engine/responder.go index f4a25c957..8c6009db6 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -93,7 +93,6 @@ func (rs *Responder) GetCost(arg *CallDescriptor, reply *CallCost) (err error) { }, arg, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound { return err } - if rs.Bal != nil { r, e := rs.getCallCost(arg, "Responder.GetCost") *reply, err = *r, e @@ -497,7 +496,7 @@ func (rs *Responder) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs *u } func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error { - cacheKey := "GetLCR" + attrs.CgrID + cacheKey := utils.LCRCachePrefix + attrs.CgrID + attrs.RunID if attrs.CallDescriptor.Subject == "" { attrs.CallDescriptor.Subject = attrs.CallDescriptor.Account } diff --git a/utils/consts.go b/utils/consts.go index 4c81a53c8..15889b7d8 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -143,7 +143,6 @@ const ( FWV = "fwv" DRYRUN = "dry_run" META_COMBIMED = "*combimed" - INTERNAL = "internal" MetaInternal = "*internal" ZERO_RATING_SUBJECT_PREFIX = "*zero" OK = "OK" @@ -243,6 +242,7 @@ const ( REFUND_ROUND_CACHE_PREFIX = "REFUND_ROUND_" GET_SESS_RUNS_CACHE_PREFIX = "GET_SESS_RUNS_" LOG_CALL_COST_CACHE_PREFIX = "LOG_CALL_COSTS_" + LCRCachePrefix = "LCR_" ALIAS_CONTEXT_RATING = "*rating" NOT_AVAILABLE = "N/A" CALL = "call"