diff --git a/apier/cdrstatsv1.go b/apier/cdrstatsv1.go index 643b09aa4..a952e7194 100644 --- a/apier/cdrstatsv1.go +++ b/apier/cdrstatsv1.go @@ -45,11 +45,7 @@ func (sts *CDRStatsV1) GetQueueIds(empty string, reply *[]string) error { return sts.CdrStats.GetQueueIds(0, reply) } -type AttrReloadQueues struct { - StatsQueueIds []string -} - -func (sts *CDRStatsV1) ReloadQueues(attr AttrReloadQueues, reply *string) error { +func (sts *CDRStatsV1) ReloadQueues(attr utils.AttrCDRStatsReloadQueues, reply *string) error { if err := sts.CdrStats.ReloadQueues(attr.StatsQueueIds, nil); err != nil { return err } @@ -57,7 +53,7 @@ func (sts *CDRStatsV1) ReloadQueues(attr AttrReloadQueues, reply *string) error return nil } -func (sts *CDRStatsV1) ResetQueues(attr AttrReloadQueues, reply *string) error { +func (sts *CDRStatsV1) ResetQueues(attr utils.AttrCDRStatsReloadQueues, reply *string) error { if err := sts.CdrStats.ResetQueues(attr.StatsQueueIds, nil); err != nil { return err } diff --git a/apier/cdrstatsv1_local_test.go b/apier/cdrstatsv1_local_test.go index 7cfd25bd8..66db4c813 100644 --- a/apier/cdrstatsv1_local_test.go +++ b/apier/cdrstatsv1_local_test.go @@ -198,7 +198,7 @@ func TestCDRStatsLclResetMetrics(t *testing.T) { return } var reply string - if err := cdrstRpc.Call("CDRStatsV1.ResetQueues", AttrReloadQueues{StatsQueueIds: []string{"CDRST4"}}, &reply); err != nil { + if err := cdrstRpc.Call("CDRStatsV1.ResetQueues", utils.AttrCDRStatsReloadQueues{StatsQueueIds: []string{"CDRST4"}}, &reply); err != nil { t.Error("Calling CDRStatsV1.ResetQueues, got error: ", err.Error()) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index f30226d7d..57c10574d 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -57,18 +57,19 @@ var ( dbdata_encoding = flag.String("dbdata_encoding", cgrConfig.DBDataEncoding, "The encoding used to store object data in strings") - flush = flag.Bool("flushdb", false, "Flush the database before importing") - tpid = flag.String("tpid", "", "The tariff plan id from the database") - dataPath = flag.String("path", "./", "The path to folder containing the data files") - version = flag.Bool("version", false, "Prints the application version.") - verbose = flag.Bool("verbose", false, "Enable detailed verbose logging output") - dryRun = flag.Bool("dry_run", false, "When true will not save loaded data to dataDb but just parse it for consistency and errors.") - stats = flag.Bool("stats", false, "Generates statsistics about given data.") - fromStorDb = flag.Bool("from_stordb", false, "Load the tariff plan from storDb to dataDb") - toStorDb = flag.Bool("to_stordb", false, "Import the tariff plan from files to storDb") - historyServer = flag.String("history_server", cgrConfig.RPCGOBListen, "The history server address:port, empty to disable automaticautomatic history archiving") - raterAddress = flag.String("rater_address", cgrConfig.RPCGOBListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads") - runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields") + flush = flag.Bool("flushdb", false, "Flush the database before importing") + tpid = flag.String("tpid", "", "The tariff plan id from the database") + dataPath = flag.String("path", "./", "The path to folder containing the data files") + version = flag.Bool("version", false, "Prints the application version.") + verbose = flag.Bool("verbose", false, "Enable detailed verbose logging output") + dryRun = flag.Bool("dry_run", false, "When true will not save loaded data to dataDb but just parse it for consistency and errors.") + stats = flag.Bool("stats", false, "Generates statsistics about given data.") + fromStorDb = flag.Bool("from_stordb", false, "Load the tariff plan from storDb to dataDb") + toStorDb = flag.Bool("to_stordb", false, "Import the tariff plan from files to storDb") + historyServer = flag.String("history_server", cgrConfig.RPCGOBListen, "The history server address:port, empty to disable automaticautomatic history archiving") + raterAddress = flag.String("rater_address", cgrConfig.RPCGOBListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads") + cdrstatsAddress = flag.String("cdrstats_address", cgrConfig.RPCGOBListen, "CDRStats service to contact for data reloads, empty to disable automatic data reloads") + runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields") ) func main() { @@ -81,7 +82,7 @@ func main() { var ratingDb engine.RatingStorage var accountDb engine.AccountingStorage var storDb engine.LoadStorage - var rater *rpc.Client + var rater, cdrstats *rpc.Client var loader engine.TPLoader // Init necessary db connections, only if not already if !*dryRun { // make sure we do not need db connections on dry run, also not importing into any stordb @@ -175,6 +176,19 @@ func main() { } else { log.Print("WARNING: Rates automatic cache reloading is disabled!") } + if *cdrstatsAddress != "" { // Init connection to rater so we can reload it's data + if *cdrstatsAddress == *raterAddress { + cdrstats = rater + } else { + cdrstats, err = rpc.Dial("tcp", *cdrstatsAddress) + if err != nil { + log.Fatalf("Could not connect to CDRStats API: %s", err.Error()) + return + } + } + } else { + log.Print("WARNING: CDRStats automatic data reload is disabled!") + } // write maps to database if err := loader.WriteToDatabase(*flush, *verbose); err != nil { @@ -203,7 +217,7 @@ func main() { dstIds, rplIds, rpfIds, rpAliases, lcrIds = nil, nil, nil, nil, nil // Should reload all these on flush } if err = rater.Call("ApierV1.ReloadCache", utils.ApiReloadCache{dstIds, rplIds, rpfIds, actIds, shgIds, rpAliases, accAliases, lcrIds, dcs}, &reply); err != nil { - log.Fatalf("Got error on cache reload: %s", err.Error()) + log.Printf("WARNING: Got error on cache reload: %s\n", err.Error()) } actTmgIds, _ := loader.GetLoadedIds(engine.ACTION_TIMING_PREFIX) if len(actTmgIds) != 0 { @@ -211,9 +225,21 @@ func main() { log.Print("Reloading scheduler") } if err = rater.Call("ApierV1.ReloadScheduler", "", &reply); err != nil { - log.Fatalf("Got error on scheduler reload: %s", err.Error()) + log.Printf("WARNING: Got error on scheduler reload: %s\n", err.Error()) } } } + if cdrstats != nil { + statsQueueIds, _ := loader.GetLoadedIds(engine.CDR_STATS_PREFIX) + if len(statsQueueIds) != 0 { + if *verbose { + log.Print("Reloading CDRStats data") + } + var reply string + if err := cdrstats.Call("CDRStatsV1.ReloadQueues", utils.AttrCDRStatsReloadQueues{StatsQueueIds: statsQueueIds}, &reply); err != nil { + log.Printf("WARNING: Failed reloading stat queues, error: %s\n", err.Error()) + } + } + } } diff --git a/console/cdrstats_reload.go b/console/cdrstats_reload.go index 45faab2fa..0776aed43 100644 --- a/console/cdrstats_reload.go +++ b/console/cdrstats_reload.go @@ -18,7 +18,9 @@ along with this program. If not, see package console -import "github.com/cgrates/cgrates/apier" +import ( + "github.com/cgrates/cgrates/utils" +) func init() { c := &CmdCdrReloadQueues{ @@ -33,7 +35,7 @@ func init() { type CmdCdrReloadQueues struct { name string rpcMethod string - rpcParams *apier.AttrReloadQueues + rpcParams *utils.AttrCDRStatsReloadQueues *CommandExecuter } @@ -47,7 +49,7 @@ func (self *CmdCdrReloadQueues) RpcMethod() string { func (self *CmdCdrReloadQueues) RpcParams() interface{} { if self.rpcParams == nil { - self.rpcParams = &apier.AttrReloadQueues{} + self.rpcParams = &utils.AttrCDRStatsReloadQueues{} } return self.rpcParams } diff --git a/console/cdrstats_reset.go b/console/cdrstats_reset.go index 3c7079272..5a3f1632c 100644 --- a/console/cdrstats_reset.go +++ b/console/cdrstats_reset.go @@ -18,7 +18,7 @@ along with this program. If not, see package console -import "github.com/cgrates/cgrates/apier" +import "github.com/cgrates/cgrates/utils" func init() { c := &CmdCdrResetQueues{ @@ -33,7 +33,7 @@ func init() { type CmdCdrResetQueues struct { name string rpcMethod string - rpcParams *apier.AttrReloadQueues + rpcParams *utils.AttrCDRStatsReloadQueues *CommandExecuter } @@ -47,7 +47,7 @@ func (self *CmdCdrResetQueues) RpcMethod() string { func (self *CmdCdrResetQueues) RpcParams() interface{} { if self.rpcParams == nil { - self.rpcParams = &apier.AttrReloadQueues{} + self.rpcParams = &utils.AttrCDRStatsReloadQueues{} } return self.rpcParams } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 7c69c09be..4a190aca1 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -455,3 +455,7 @@ func NewDTAFromAccountKey(accountKey string) (*DirectionTenantAccount, error) { type DirectionTenantAccount struct { Direction, Tenant, Account string } + +type AttrCDRStatsReloadQueues struct { + StatsQueueIds []string +}