mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
cgr-loader to automatically reload cdrstats data
This commit is contained in:
@@ -45,11 +45,7 @@ func (sts *CDRStatsV1) GetQueueIds(empty string, reply *[]string) error {
|
||||
return sts.CdrStats.GetQueueIds(0, reply)
|
||||
}
|
||||
|
||||
type AttrReloadQueues struct {
|
||||
StatsQueueIds []string
|
||||
}
|
||||
|
||||
func (sts *CDRStatsV1) ReloadQueues(attr AttrReloadQueues, reply *string) error {
|
||||
func (sts *CDRStatsV1) ReloadQueues(attr utils.AttrCDRStatsReloadQueues, reply *string) error {
|
||||
if err := sts.CdrStats.ReloadQueues(attr.StatsQueueIds, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -57,7 +53,7 @@ func (sts *CDRStatsV1) ReloadQueues(attr AttrReloadQueues, reply *string) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sts *CDRStatsV1) ResetQueues(attr AttrReloadQueues, reply *string) error {
|
||||
func (sts *CDRStatsV1) ResetQueues(attr utils.AttrCDRStatsReloadQueues, reply *string) error {
|
||||
if err := sts.CdrStats.ResetQueues(attr.StatsQueueIds, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -198,7 +198,7 @@ func TestCDRStatsLclResetMetrics(t *testing.T) {
|
||||
return
|
||||
}
|
||||
var reply string
|
||||
if err := cdrstRpc.Call("CDRStatsV1.ResetQueues", AttrReloadQueues{StatsQueueIds: []string{"CDRST4"}}, &reply); err != nil {
|
||||
if err := cdrstRpc.Call("CDRStatsV1.ResetQueues", utils.AttrCDRStatsReloadQueues{StatsQueueIds: []string{"CDRST4"}}, &reply); err != nil {
|
||||
t.Error("Calling CDRStatsV1.ResetQueues, got error: ", err.Error())
|
||||
} else if reply != utils.OK {
|
||||
t.Error("Unexpected reply received: ", reply)
|
||||
|
||||
@@ -57,18 +57,19 @@ var (
|
||||
|
||||
dbdata_encoding = flag.String("dbdata_encoding", cgrConfig.DBDataEncoding, "The encoding used to store object data in strings")
|
||||
|
||||
flush = flag.Bool("flushdb", false, "Flush the database before importing")
|
||||
tpid = flag.String("tpid", "", "The tariff plan id from the database")
|
||||
dataPath = flag.String("path", "./", "The path to folder containing the data files")
|
||||
version = flag.Bool("version", false, "Prints the application version.")
|
||||
verbose = flag.Bool("verbose", false, "Enable detailed verbose logging output")
|
||||
dryRun = flag.Bool("dry_run", false, "When true will not save loaded data to dataDb but just parse it for consistency and errors.")
|
||||
stats = flag.Bool("stats", false, "Generates statsistics about given data.")
|
||||
fromStorDb = flag.Bool("from_stordb", false, "Load the tariff plan from storDb to dataDb")
|
||||
toStorDb = flag.Bool("to_stordb", false, "Import the tariff plan from files to storDb")
|
||||
historyServer = flag.String("history_server", cgrConfig.RPCGOBListen, "The history server address:port, empty to disable automaticautomatic history archiving")
|
||||
raterAddress = flag.String("rater_address", cgrConfig.RPCGOBListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads")
|
||||
runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields")
|
||||
flush = flag.Bool("flushdb", false, "Flush the database before importing")
|
||||
tpid = flag.String("tpid", "", "The tariff plan id from the database")
|
||||
dataPath = flag.String("path", "./", "The path to folder containing the data files")
|
||||
version = flag.Bool("version", false, "Prints the application version.")
|
||||
verbose = flag.Bool("verbose", false, "Enable detailed verbose logging output")
|
||||
dryRun = flag.Bool("dry_run", false, "When true will not save loaded data to dataDb but just parse it for consistency and errors.")
|
||||
stats = flag.Bool("stats", false, "Generates statsistics about given data.")
|
||||
fromStorDb = flag.Bool("from_stordb", false, "Load the tariff plan from storDb to dataDb")
|
||||
toStorDb = flag.Bool("to_stordb", false, "Import the tariff plan from files to storDb")
|
||||
historyServer = flag.String("history_server", cgrConfig.RPCGOBListen, "The history server address:port, empty to disable automaticautomatic history archiving")
|
||||
raterAddress = flag.String("rater_address", cgrConfig.RPCGOBListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads")
|
||||
cdrstatsAddress = flag.String("cdrstats_address", cgrConfig.RPCGOBListen, "CDRStats service to contact for data reloads, empty to disable automatic data reloads")
|
||||
runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields")
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -81,7 +82,7 @@ func main() {
|
||||
var ratingDb engine.RatingStorage
|
||||
var accountDb engine.AccountingStorage
|
||||
var storDb engine.LoadStorage
|
||||
var rater *rpc.Client
|
||||
var rater, cdrstats *rpc.Client
|
||||
var loader engine.TPLoader
|
||||
// Init necessary db connections, only if not already
|
||||
if !*dryRun { // make sure we do not need db connections on dry run, also not importing into any stordb
|
||||
@@ -175,6 +176,19 @@ func main() {
|
||||
} else {
|
||||
log.Print("WARNING: Rates automatic cache reloading is disabled!")
|
||||
}
|
||||
if *cdrstatsAddress != "" { // Init connection to rater so we can reload it's data
|
||||
if *cdrstatsAddress == *raterAddress {
|
||||
cdrstats = rater
|
||||
} else {
|
||||
cdrstats, err = rpc.Dial("tcp", *cdrstatsAddress)
|
||||
if err != nil {
|
||||
log.Fatalf("Could not connect to CDRStats API: %s", err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Print("WARNING: CDRStats automatic data reload is disabled!")
|
||||
}
|
||||
|
||||
// write maps to database
|
||||
if err := loader.WriteToDatabase(*flush, *verbose); err != nil {
|
||||
@@ -203,7 +217,7 @@ func main() {
|
||||
dstIds, rplIds, rpfIds, rpAliases, lcrIds = nil, nil, nil, nil, nil // Should reload all these on flush
|
||||
}
|
||||
if err = rater.Call("ApierV1.ReloadCache", utils.ApiReloadCache{dstIds, rplIds, rpfIds, actIds, shgIds, rpAliases, accAliases, lcrIds, dcs}, &reply); err != nil {
|
||||
log.Fatalf("Got error on cache reload: %s", err.Error())
|
||||
log.Printf("WARNING: Got error on cache reload: %s\n", err.Error())
|
||||
}
|
||||
actTmgIds, _ := loader.GetLoadedIds(engine.ACTION_TIMING_PREFIX)
|
||||
if len(actTmgIds) != 0 {
|
||||
@@ -211,9 +225,21 @@ func main() {
|
||||
log.Print("Reloading scheduler")
|
||||
}
|
||||
if err = rater.Call("ApierV1.ReloadScheduler", "", &reply); err != nil {
|
||||
log.Fatalf("Got error on scheduler reload: %s", err.Error())
|
||||
log.Printf("WARNING: Got error on scheduler reload: %s\n", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
if cdrstats != nil {
|
||||
statsQueueIds, _ := loader.GetLoadedIds(engine.CDR_STATS_PREFIX)
|
||||
if len(statsQueueIds) != 0 {
|
||||
if *verbose {
|
||||
log.Print("Reloading CDRStats data")
|
||||
}
|
||||
var reply string
|
||||
if err := cdrstats.Call("CDRStatsV1.ReloadQueues", utils.AttrCDRStatsReloadQueues{StatsQueueIds: statsQueueIds}, &reply); err != nil {
|
||||
log.Printf("WARNING: Failed reloading stat queues, error: %s\n", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
package console
|
||||
|
||||
import "github.com/cgrates/cgrates/apier"
|
||||
import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func init() {
|
||||
c := &CmdCdrReloadQueues{
|
||||
@@ -33,7 +35,7 @@ func init() {
|
||||
type CmdCdrReloadQueues struct {
|
||||
name string
|
||||
rpcMethod string
|
||||
rpcParams *apier.AttrReloadQueues
|
||||
rpcParams *utils.AttrCDRStatsReloadQueues
|
||||
*CommandExecuter
|
||||
}
|
||||
|
||||
@@ -47,7 +49,7 @@ func (self *CmdCdrReloadQueues) RpcMethod() string {
|
||||
|
||||
func (self *CmdCdrReloadQueues) RpcParams() interface{} {
|
||||
if self.rpcParams == nil {
|
||||
self.rpcParams = &apier.AttrReloadQueues{}
|
||||
self.rpcParams = &utils.AttrCDRStatsReloadQueues{}
|
||||
}
|
||||
return self.rpcParams
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
package console
|
||||
|
||||
import "github.com/cgrates/cgrates/apier"
|
||||
import "github.com/cgrates/cgrates/utils"
|
||||
|
||||
func init() {
|
||||
c := &CmdCdrResetQueues{
|
||||
@@ -33,7 +33,7 @@ func init() {
|
||||
type CmdCdrResetQueues struct {
|
||||
name string
|
||||
rpcMethod string
|
||||
rpcParams *apier.AttrReloadQueues
|
||||
rpcParams *utils.AttrCDRStatsReloadQueues
|
||||
*CommandExecuter
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ func (self *CmdCdrResetQueues) RpcMethod() string {
|
||||
|
||||
func (self *CmdCdrResetQueues) RpcParams() interface{} {
|
||||
if self.rpcParams == nil {
|
||||
self.rpcParams = &apier.AttrReloadQueues{}
|
||||
self.rpcParams = &utils.AttrCDRStatsReloadQueues{}
|
||||
}
|
||||
return self.rpcParams
|
||||
}
|
||||
|
||||
@@ -455,3 +455,7 @@ func NewDTAFromAccountKey(accountKey string) (*DirectionTenantAccount, error) {
|
||||
type DirectionTenantAccount struct {
|
||||
Direction, Tenant, Account string
|
||||
}
|
||||
|
||||
type AttrCDRStatsReloadQueues struct {
|
||||
StatsQueueIds []string
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user