add load history for every cash save operation

This commit is contained in:
Radu Ioan Fericean
2016-06-29 12:29:32 +03:00
parent f3b650f888
commit ce0896d15a
15 changed files with 124 additions and 60 deletions

View File

@@ -839,10 +839,12 @@ func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.Cach
if err != nil { // Not really an error here since we only count in cache
utils.Logger.Warning(fmt.Sprintf("ApierV1.GetCacheStats, error on GetLoadHistory: %s", err.Error()))
}
cs.LastLoadId = utils.NOT_AVAILABLE
cs.LastRatingLoadID = utils.NOT_AVAILABLE
cs.LastAccountingLoadID = utils.NOT_AVAILABLE
cs.LastLoadTime = utils.NOT_AVAILABLE
} else {
cs.LastLoadId = loadHistInsts[0].LoadId
cs.LastRatingLoadID = loadHistInsts[0].RatingLoadID
cs.LastAccountingLoadID = loadHistInsts[0].AccountingLoadID
cs.LastLoadTime = loadHistInsts[0].LoadTime.Format(time.RFC3339)
}
*reply = *cs

View File

@@ -826,7 +826,7 @@ func TestApierGetCacheStats(t *testing.T) {
return
}
var rcvStats *utils.CacheStats
expectedStats := &utils.CacheStats{Destinations: 3, RatingPlans: 1, RatingProfiles: 2, Actions: 2, ActionPlans: 1, LastLoadId: utils.NOT_AVAILABLE, LastLoadTime: utils.NOT_AVAILABLE}
expectedStats := &utils.CacheStats{Destinations: 3, RatingPlans: 1, RatingProfiles: 2, Actions: 2, ActionPlans: 1, LastRatingLoadID: utils.NOT_AVAILABLE, LastAccountingLoadID: utils.NOT_AVAILABLE, LastLoadTime: utils.NOT_AVAILABLE}
var args utils.AttrCacheStats
if err := rater.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil {
t.Error("Got error on ApierV1.GetCacheStats: ", err.Error())
@@ -1244,7 +1244,7 @@ func TestApierResetDataBeforeLoadFromFolder(t *testing.T) {
t.Error("Calling ApierV1.ReloadCache got reply: ", reply)
}
var rcvStats *utils.CacheStats
expectedStats := &utils.CacheStats{LastLoadId: utils.NOT_AVAILABLE, LastLoadTime: utils.NOT_AVAILABLE}
expectedStats := &utils.CacheStats{LastRatingLoadID: utils.NOT_AVAILABLE, LastAccountingLoadID: utils.NOT_AVAILABLE, LastLoadTime: utils.NOT_AVAILABLE}
var args utils.AttrCacheStats
if err := rater.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil {
t.Error("Got error on ApierV1.GetCacheStats: ", err.Error())
@@ -1759,7 +1759,7 @@ func TestApierGetCacheStats2(t *testing.T) {
return
}
var rcvStats *utils.CacheStats
expectedStats := &utils.CacheStats{LastLoadId: utils.NOT_AVAILABLE, LastLoadTime: utils.NOT_AVAILABLE}
expectedStats := &utils.CacheStats{LastRatingLoadID: utils.NOT_AVAILABLE, LastAccountingLoadID: utils.NOT_AVAILABLE, LastLoadTime: utils.NOT_AVAILABLE}
var args utils.AttrCacheStats
if err := rater.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil {
t.Error("Got error on ApierV1.GetCacheStats: ", err.Error())

View File

@@ -103,7 +103,7 @@ func TestSMGV1LoadTariffPlanFromFolder(t *testing.T) {
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")}
if err := smgV1Rpc.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &smgV1LoadInst); err != nil {
t.Error(err)
} else if smgV1LoadInst.LoadId == "" {
} else if smgV1LoadInst.RatingLoadID == "" && smgV1LoadInst.AccountingLoadID == "" {
t.Error("Empty loadId received, loadInstance: ", smgV1LoadInst)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups
@@ -117,7 +117,7 @@ func TestSMGV1CacheStats(t *testing.T) {
var rcvStats *utils.CacheStats
expectedStats := &utils.CacheStats{Destinations: 7, RatingPlans: 4, RatingProfiles: 9, Actions: 8, ActionPlans: 4, SharedGroups: 1, Aliases: 1,
DerivedChargers: 1, LcrProfiles: 5, CdrStats: 6, Users: 3, LastLoadId: smgV1LoadInst.LoadId, LastLoadTime: smgV1LoadInst.LoadTime.Format(time.RFC3339)}
DerivedChargers: 1, LcrProfiles: 5, CdrStats: 6, Users: 3, LastRatingLoadID: smgV1LoadInst.RatingLoadID, LastAccountingLoadID: smgV1LoadInst.AccountingLoadID, LastLoadTime: smgV1LoadInst.LoadTime.Format(time.RFC3339)}
var args utils.AttrCacheStats
if err := smgV1Rpc.Call("ApierV2.GetCacheStats", args, &rcvStats); err != nil {
t.Error("Got error on ApierV2.GetCacheStats: ", err.Error())

View File

@@ -161,7 +161,7 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
return utils.NewErrServerError(err)
}
if attrs.DryRun {
*reply = utils.LoadInstance{LoadId: utils.DRYRUN}
*reply = utils.LoadInstance{RatingLoadID: utils.DRYRUN, AccountingLoadID: utils.DRYRUN}
return nil // Mission complete, no errors
}
@@ -225,7 +225,6 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
cstKeys, _ := loader.GetLoadedIds(utils.CDR_STATS_PREFIX)
userKeys, _ := loader.GetLoadedIds(utils.USERS_PREFIX)
li := loader.GetLoadInstance()
loader.Init() // release the tp data
if err := self.RatingDb.CacheRatingPrefixValues(map[string][]string{
utils.DESTINATION_PREFIX: dstKeys,
@@ -260,7 +259,13 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
return err
}
}
*reply = *li
loadHistList, err := self.AccountDb.GetLoadHistory(1, false)
if err != nil {
return err
}
if len(loadHistList) > 0 {
*reply = *loadHistList[0]
}
return nil
}

View File

@@ -276,7 +276,7 @@ func TestV2CdrsMongoLoadTariffPlanFromFolder(t *testing.T) {
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")}
if err := cdrsMongoRpc.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil {
t.Error(err)
} else if loadInst.LoadId == "" {
} else if loadInst.RatingLoadID == "" || loadInst.AccountingLoadID == "" {
t.Error("Empty loadId received, loadInstance: ", loadInst)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups

View File

@@ -278,7 +278,7 @@ func TestV2CDRsMySQLLoadTariffPlanFromFolder(t *testing.T) {
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")}
if err := cdrsRpc.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil {
t.Error(err)
} else if loadInst.LoadId == "" {
} else if loadInst.RatingLoadID == "" || loadInst.AccountingLoadID == "" {
t.Error("Empty loadId received, loadInstance: ", loadInst)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups

View File

@@ -274,7 +274,7 @@ func TestV2CDRsPSQLLoadTariffPlanFromFolder(t *testing.T) {
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")}
if err := cdrsPsqlRpc.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil {
t.Error(err)
} else if loadInst.LoadId == "" {
} else if loadInst.RatingLoadID == "" || loadInst.AccountingLoadID == "" {
t.Error("Empty loadId received, loadInstance: ", loadInst)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups

View File

@@ -405,7 +405,7 @@ func startAliasesServer(internalAliaseSChan chan rpcclient.RpcClientConnection,
start := time.Now()
cfi, err := utils.LoadCacheFileInfo(cfg.CacheDumpDir)
if err != nil || cfi.LoadInfo.LoadId != loadHist[0].LoadId || !utils.CacheFileExists(filepath.Join(cfg.CacheDumpDir, utils.ALIASES_PREFIX+".cache")) {
if err != nil || cfi.LoadInfo.AccountingLoadID != loadHist[0].AccountingLoadID || !utils.CacheFileExists(filepath.Join(cfg.CacheDumpDir, utils.ALIASES_PREFIX+".cache")) {
if err := accountDb.CacheAccountingPrefixes(utils.ALIASES_PREFIX); err != nil {
utils.Logger.Crit(fmt.Sprintf("<Aliases> Could not start, error: %s", err.Error()))
exitChan <- true

View File

@@ -74,7 +74,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
start := time.Now()
cfi, err := utils.LoadCacheFileInfo(cfg.CacheDumpDir)
if err != nil || cfi.LoadInfo.LoadId != loadHist[0].LoadId {
if err != nil || cfi.LoadInfo.RatingLoadID != loadHist[0].RatingLoadID {
if err := ratingDb.CacheRatingAll(); err != nil {
utils.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error()))
exitChan <- true

View File

@@ -6,6 +6,12 @@
// This file contains the default configuration hardcoded into CGRateS.
// This is what you get when you load CGRateS with an empty configuration file.
"listen": {
"rpc_json": "0.0.0.0:3012", // RPC JSON listening address
"rpc_gob": "0.0.0.0:3013", // RPC GOB listening address
"http": "0.0.0.0:3080", // HTTP listening address
},
"rals": {
"enabled": true, // enable Rater service: <true|false>

View File

@@ -25,6 +25,7 @@ import (
"fmt"
"io/ioutil"
"strings"
"time"
"github.com/cgrates/cgrates/utils"
"gopkg.in/mgo.v2"
@@ -641,10 +642,21 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
utils.Logger.Info("Finished shared groups caching.")
}
CacheCommitTransaction()
loadHist, err := ms.GetLoadHistory(1, true)
if err != nil || len(loadHist) == 0 {
utils.Logger.Info(fmt.Sprintf("could not get load history: %v (%v)", loadHist, err))
return err
loadHistList, err := ms.GetLoadHistory(1, true)
if err != nil || len(loadHistList) == 0 {
utils.Logger.Info(fmt.Sprintf("could not get load history: %v (%v)", loadHistList, err))
}
var loadHist *utils.LoadInstance
if len(loadHistList) == 0 {
loadHist = &utils.LoadInstance{
RatingLoadID: utils.GenUUID(),
AccountingLoadID: utils.GenUUID(),
LoadTime: time.Now(),
}
} else {
loadHist = loadHistList[0]
loadHist.RatingLoadID = utils.GenUUID()
loadHist.LoadTime = time.Now()
}
var keys []string
if len(dKeys) > 0 {
@@ -668,7 +680,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
if len(shgKeys) > 0 {
keys = append(keys, utils.SHARED_GROUP_PREFIX)
}
return CacheSave(ms.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist[0]})
return CacheSave(ms.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist})
}
func (ms *MongoStorage) CacheAccountingAll() error {
@@ -742,7 +754,7 @@ func (ms *MongoStorage) cacheAccounting(alsKeys []string) (err error) {
utils.Logger.Info("Finished aliases caching.")
}
utils.Logger.Info("Caching load history")
loadHist, err := ms.GetLoadHistory(1, true)
loadHistList, err := ms.GetLoadHistory(1, true)
if err != nil {
CacheRollbackTransaction()
return err
@@ -753,8 +765,21 @@ func (ms *MongoStorage) cacheAccounting(alsKeys []string) (err error) {
if len(alsKeys) > 0 {
keys = append(keys, utils.ALIASES_PREFIX)
}
return CacheSave(ms.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist[0]})
return nil
var loadHist *utils.LoadInstance
if len(loadHistList) == 0 {
loadHist = &utils.LoadInstance{
RatingLoadID: utils.GenUUID(),
AccountingLoadID: utils.GenUUID(),
LoadTime: time.Now(),
}
} else {
loadHist = loadHistList[0]
loadHist.AccountingLoadID = utils.GenUUID()
loadHist.LoadTime = time.Now()
}
return CacheSave(ms.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist})
}
func (ms *MongoStorage) HasData(category, subject string) (bool, error) {

View File

@@ -316,10 +316,21 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
}
CacheCommitTransaction()
loadHist, err := rs.GetLoadHistory(1, true)
if err != nil || len(loadHist) == 0 {
utils.Logger.Info(fmt.Sprintf("could not get load history: %v (%v)", loadHist, err))
return err
loadHistList, err := rs.GetLoadHistory(1, true)
if err != nil || len(loadHistList) == 0 {
utils.Logger.Info(fmt.Sprintf("could not get load history: %v (%v)", loadHistList, err))
}
var loadHist *utils.LoadInstance
if len(loadHistList) == 0 {
loadHist = &utils.LoadInstance{
RatingLoadID: utils.GenUUID(),
AccountingLoadID: utils.GenUUID(),
LoadTime: time.Now(),
}
} else {
loadHist = loadHistList[0]
loadHist.RatingLoadID = utils.GenUUID()
loadHist.LoadTime = time.Now()
}
var keys []string
if len(dKeys) > 0 {
@@ -343,7 +354,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
if len(shgKeys) > 0 {
keys = append(keys, utils.SHARED_GROUP_PREFIX)
}
return CacheSave(rs.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist[0]})
return CacheSave(rs.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist})
}
func (rs *RedisStorage) CacheAccountingAll() error {
@@ -423,12 +434,23 @@ func (rs *RedisStorage) cacheAccounting(alsKeys []string) (err error) {
if len(alsKeys) > 0 {
keys = append(keys, utils.ALIASES_PREFIX)
}
loadHist, err := rs.GetLoadHistory(1, true)
if err != nil || len(loadHist) == 0 {
utils.Logger.Info(fmt.Sprintf("could not get load history: %v (%v)", loadHist, err))
return err
loadHistList, err := rs.GetLoadHistory(1, true)
if err != nil || len(loadHistList) == 0 {
utils.Logger.Info(fmt.Sprintf("could not get load history: %v (%v)", loadHistList, err))
}
return CacheSave(rs.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist[0]})
var loadHist *utils.LoadInstance
if len(loadHistList) == 0 {
loadHist = &utils.LoadInstance{
RatingLoadID: utils.GenUUID(),
AccountingLoadID: utils.GenUUID(),
LoadTime: time.Now(),
}
} else {
loadHist = loadHistList[0]
loadHist.AccountingLoadID = utils.GenUUID()
loadHist.LoadTime = time.Now()
}
return CacheSave(rs.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist})
}
// Used to check if specific subject is stored using prefix key attached to entity

View File

@@ -6,7 +6,6 @@ import (
"log"
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/structmatcher"
"github.com/cgrates/cgrates/utils"
@@ -37,7 +36,7 @@ type TpReader struct {
cdrStats map[string]*CdrStats
users map[string]*UserProfile
aliases map[string]*Alias
loadInstance *utils.LoadInstance
//loadInstance *utils.LoadInstance
}
func NewTpReader(rs RatingStorage, as AccountingStorage, lr LoadReader, tpid, timezone string, loadHistSize int) *TpReader {
@@ -1617,12 +1616,12 @@ func (tpr *TpReader) IsValid() bool {
return valid
}
func (tpr *TpReader) GetLoadInstance() *utils.LoadInstance {
/*func (tpr *TpReader) GetLoadInstance() *utils.LoadInstance {
if tpr.loadInstance == nil {
tpr.loadInstance = &utils.LoadInstance{LoadId: utils.GenUUID(), TariffPlanId: tpr.tpid, LoadTime: time.Now()}
}
return tpr.loadInstance
}
}*/
func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) {
if tpr.ratingStorage == nil || tpr.accountingStorage == nil {
@@ -1816,13 +1815,15 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) {
log.Print("\t", al.GetId())
}
}
ldInst := tpr.GetLoadInstance()
if verbose {
log.Printf("LoadHistory, instance: %+v\n", ldInst)
}
if err = tpr.accountingStorage.AddLoadHistory(ldInst, tpr.loadHistSize); err != nil {
return err
}
/*
ldInst := tpr.GetLoadInstance()
if verbose {
log.Printf("LoadHistory, instance: %+v\n", ldInst)
}
if err = tpr.accountingStorage.AddLoadHistory(ldInst, tpr.loadHistSize); err != nil {
return err
}
*/
return
}

View File

@@ -577,19 +577,20 @@ type AttrCacheStats struct { // Add in the future filters here maybe so we avoid
}
type CacheStats struct {
Destinations int
RatingPlans int
RatingProfiles int
Actions int
ActionPlans int
SharedGroups int
DerivedChargers int
LcrProfiles int
CdrStats int
Users int
Aliases int
LastLoadId string
LastLoadTime string
Destinations int
RatingPlans int
RatingProfiles int
Actions int
ActionPlans int
SharedGroups int
DerivedChargers int
LcrProfiles int
CdrStats int
Users int
Aliases int
LastRatingLoadID string
LastAccountingLoadID string
LastLoadTime string
}
type AttrExpFileCdrs struct {

View File

@@ -8,9 +8,11 @@ import (
)
type LoadInstance struct {
LoadId string // Unique identifier for the load
TariffPlanId string // Tariff plan identificator for the data loaded
LoadTime time.Time // Time of load
//LoadId string // Unique identifier for the load
RatingLoadID string
AccountingLoadID string
//TariffPlanID string // Tariff plan identificator for the data loaded
LoadTime time.Time // Time of load
}
type CacheFileInfo struct {