mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-14 20:59:53 +05:00
initial cache files mechanism
This commit is contained in:
@@ -1048,7 +1048,7 @@ func (self *ApierV1) RemoveRatingProfile(attr AttrRemoveRatingProfile, reply *st
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *ApierV1) GetLoadHistory(attrs utils.Paginator, reply *[]*engine.LoadInstance) error {
|
||||
func (self *ApierV1) GetLoadHistory(attrs utils.Paginator, reply *[]*utils.LoadInstance) error {
|
||||
nrItems := -1
|
||||
offset := 0
|
||||
if attrs.Offset != nil { // For offset we need full data
|
||||
|
||||
@@ -36,7 +36,7 @@ import (
|
||||
var smgV1CfgPath string
|
||||
var smgV1Cfg *config.CGRConfig
|
||||
var smgV1Rpc *rpc.Client
|
||||
var smgV1LoadInst engine.LoadInstance // Share load information between tests
|
||||
var smgV1LoadInst utils.LoadInstance // Share load information between tests
|
||||
|
||||
func TestSMGV1InitCfg(t *testing.T) {
|
||||
if !*testLocal {
|
||||
|
||||
@@ -127,7 +127,7 @@ func (self *ApierV2) LoadDerivedChargers(attrs AttrLoadDerivedChargers, reply *s
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, reply *engine.LoadInstance) error {
|
||||
func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, reply *utils.LoadInstance) error {
|
||||
if len(attrs.FolderPath) == 0 {
|
||||
return fmt.Errorf("%s:%s", utils.ErrMandatoryIeMissing.Error(), "FolderPath")
|
||||
}
|
||||
@@ -161,7 +161,7 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if attrs.DryRun {
|
||||
*reply = engine.LoadInstance{LoadId: utils.DRYRUN}
|
||||
*reply = utils.LoadInstance{LoadId: utils.DRYRUN}
|
||||
return nil // Mission complete, no errors
|
||||
}
|
||||
|
||||
|
||||
@@ -272,7 +272,7 @@ func TestV2CdrsMongoLoadTariffPlanFromFolder(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
var loadInst engine.LoadInstance
|
||||
var loadInst utils.LoadInstance
|
||||
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")}
|
||||
if err := cdrsMongoRpc.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -274,7 +274,7 @@ func TestV2CDRsMySQLLoadTariffPlanFromFolder(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
var loadInst engine.LoadInstance
|
||||
var loadInst utils.LoadInstance
|
||||
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")}
|
||||
if err := cdrsRpc.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -270,7 +270,7 @@ func TestV2CDRsPSQLLoadTariffPlanFromFolder(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
var loadInst engine.LoadInstance
|
||||
var loadInst utils.LoadInstance
|
||||
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")}
|
||||
if err := cdrsPsqlRpc.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -13,6 +13,16 @@ const (
|
||||
DOUBLE_CACHE = true
|
||||
)
|
||||
|
||||
var (
|
||||
mux sync.RWMutex
|
||||
cache cacheStore
|
||||
// transaction stuff
|
||||
transactionBuffer []*transactionItem
|
||||
transactionMux sync.Mutex
|
||||
transactionON = false
|
||||
transactionLock = false
|
||||
)
|
||||
|
||||
type transactionItem struct {
|
||||
key string
|
||||
value interface{}
|
||||
@@ -27,16 +37,6 @@ func init() {
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
mux sync.RWMutex
|
||||
cache cacheStore
|
||||
// transaction stuff
|
||||
transactionBuffer []*transactionItem
|
||||
transactionMux sync.Mutex
|
||||
transactionON = false
|
||||
transactionLock = false
|
||||
)
|
||||
|
||||
func BeginTransaction() {
|
||||
transactionMux.Lock()
|
||||
transactionLock = true
|
||||
@@ -74,6 +74,22 @@ func CommitTransaction() {
|
||||
transactionMux.Unlock()
|
||||
}
|
||||
|
||||
func Save(path string, keys []string) error {
|
||||
if !transactionLock {
|
||||
mux.Lock()
|
||||
defer mux.Unlock()
|
||||
}
|
||||
return cache.Save(path, keys)
|
||||
}
|
||||
|
||||
func Load(path string, keys []string) error {
|
||||
if !transactionLock {
|
||||
mux.Lock()
|
||||
defer mux.Unlock()
|
||||
}
|
||||
return cache.Load(path, keys)
|
||||
}
|
||||
|
||||
// The function to be used to cache a key/value pair when expiration is not needed
|
||||
func Cache(key string, value interface{}) {
|
||||
if !transactionLock {
|
||||
|
||||
@@ -2,7 +2,11 @@
|
||||
package cache2go
|
||||
|
||||
import (
|
||||
"encoding/gob"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
@@ -17,6 +21,8 @@ type cacheStore interface {
|
||||
CountEntriesForPrefix(string) int
|
||||
GetAllForPrefix(string) (map[string]interface{}, error)
|
||||
GetKeysForPrefix(string) []string
|
||||
Save(string, []string) error
|
||||
Load(string, []string) error
|
||||
}
|
||||
|
||||
// easy to be counted exported by prefix
|
||||
@@ -108,6 +114,69 @@ func (cs cacheDoubleStore) GetKeysForPrefix(prefix string) (keys []string) {
|
||||
return
|
||||
}
|
||||
|
||||
func (cs cacheDoubleStore) Save(path string, keys []string) error {
|
||||
// create a the path
|
||||
if err := os.MkdirAll(path, 0766); err != nil {
|
||||
utils.Logger.Err("<cache encoder>:" + err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, key := range keys {
|
||||
key = key[:PREFIX_LEN]
|
||||
value, found := cs[key]
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(fileName string, data map[string]interface{}) {
|
||||
defer wg.Done()
|
||||
dataFile, err := os.Create(filepath.Join(path, fileName) + ".cache")
|
||||
defer dataFile.Close()
|
||||
if err != nil {
|
||||
utils.Logger.Err("<cache encoder>:" + err.Error())
|
||||
}
|
||||
|
||||
// serialize the data
|
||||
dataEncoder := gob.NewEncoder(dataFile)
|
||||
if err := dataEncoder.Encode(data); err != nil {
|
||||
utils.Logger.Err("<cache encoder>:" + err.Error())
|
||||
}
|
||||
}(key, value)
|
||||
}
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs cacheDoubleStore) Load(path string, keys []string) error {
|
||||
var wg sync.WaitGroup
|
||||
for _, key := range keys {
|
||||
key = key[:PREFIX_LEN] // make sure it's only limited to prefix length'
|
||||
file := filepath.Join(path, key+".cache")
|
||||
wg.Add(1)
|
||||
go func(fileName, key string) {
|
||||
defer wg.Done()
|
||||
|
||||
// open data file
|
||||
dataFile, err := os.Open(fileName)
|
||||
defer dataFile.Close()
|
||||
if err != nil {
|
||||
utils.Logger.Err("<cache decoder>: " + err.Error())
|
||||
}
|
||||
|
||||
val := make(map[string]interface{})
|
||||
dataDecoder := gob.NewDecoder(dataFile)
|
||||
err = dataDecoder.Decode(&val)
|
||||
if err != nil {
|
||||
utils.Logger.Err("<cache decoder>: " + err.Error())
|
||||
}
|
||||
cs[key] = val
|
||||
}(file, key)
|
||||
}
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
// faster to access
|
||||
type cacheSimpleStore struct {
|
||||
cache map[string]interface{}
|
||||
@@ -232,3 +301,13 @@ func (cs cacheSimpleStore) GetKeysForPrefix(prefix string) (keys []string) {
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (cs cacheSimpleStore) Save(path string, keys []string) error {
|
||||
utils.Logger.Info("simplestore save")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs cacheSimpleStore) Load(path string, keys []string) error {
|
||||
utils.Logger.Info("simplestore load")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"path/filepath"
|
||||
// _ "net/http/pprof"
|
||||
"os"
|
||||
"runtime"
|
||||
@@ -33,6 +34,7 @@ import (
|
||||
"github.com/cgrates/cgrates/apier/v1"
|
||||
"github.com/cgrates/cgrates/apier/v2"
|
||||
"github.com/cgrates/cgrates/balancer2go"
|
||||
"github.com/cgrates/cgrates/cache2go"
|
||||
"github.com/cgrates/cgrates/cdrc"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -394,11 +396,41 @@ func startPubSubServer(internalPubSubSChan chan rpcclient.RpcClientConnection, a
|
||||
func startAliasesServer(internalAliaseSChan chan rpcclient.RpcClientConnection, accountDb engine.AccountingStorage, server *utils.Server, exitChan chan bool) {
|
||||
aliasesServer := engine.NewAliasHandler(accountDb)
|
||||
server.RpcRegisterName("AliasesV1", aliasesServer)
|
||||
if err := accountDb.CacheAccountingPrefixes(utils.ALIASES_PREFIX); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<Aliases> Could not start, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
loadHist, err := accountDb.GetLoadHistory(1, true)
|
||||
if err != nil || len(loadHist) == 0 {
|
||||
utils.Logger.Info(fmt.Sprintf("could not get load history: %v (%v)", loadHist, err))
|
||||
internalAliaseSChan <- aliasesServer
|
||||
return
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
cfi, err := utils.LoadCacheFileInfo("/tmp/cgr_cache")
|
||||
if err != nil || cfi.LoadInfo.LoadId != loadHist[0].LoadId || !utils.CacheFileExists(filepath.Join("/tmp/cgr_cache", utils.ALIASES_PREFIX+".cache")) {
|
||||
if err := accountDb.CacheAccountingPrefixes(utils.ALIASES_PREFIX); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<Aliases> Could not start, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("Cache accounting creation time: %v", time.Since(start)))
|
||||
|
||||
start = time.Now()
|
||||
if err := utils.SaveCacheFileInfo("/tmp/cgr_cache", &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist[0]}); err != nil {
|
||||
utils.Logger.Crit("could not write cache info file: " + err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if err := cache2go.Save("/tmp/cgr_cache", []string{utils.ALIASES_PREFIX}); err != nil {
|
||||
utils.Logger.Emerg(fmt.Sprintf("could not save cache file: " + err.Error()))
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("Cache accounting save time: %v", time.Since(start)))
|
||||
} else {
|
||||
if err := cache2go.Load("/tmp/cgr_cache", []string{utils.ALIASES_PREFIX}); err != nil {
|
||||
utils.Logger.Crit("could not load cache file: " + err.Error())
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("Cache accounting load time: %v", time.Since(start)))
|
||||
}
|
||||
internalAliaseSChan <- aliasesServer
|
||||
}
|
||||
|
||||
|
||||
@@ -19,12 +19,14 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/apier/v1"
|
||||
"github.com/cgrates/cgrates/apier/v2"
|
||||
"github.com/cgrates/cgrates/balancer2go"
|
||||
"github.com/cgrates/cgrates/cache2go"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/history"
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
@@ -32,6 +34,17 @@ import (
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
func init() {
|
||||
gob.Register(map[interface{}]struct{}{})
|
||||
gob.Register(engine.Actions{})
|
||||
gob.RegisterName("github.com/cgrates/cgrates/engine.ActionPlan", &engine.ActionPlan{})
|
||||
gob.Register([]*utils.LoadInstance{})
|
||||
gob.RegisterName("github.com/cgrates/cgrates/engine.RatingPlan", engine.RatingPlan{})
|
||||
gob.RegisterName("github.com/cgrates/cgrates/engine.RatingProfile", engine.RatingProfile{})
|
||||
gob.RegisterName("github.com/cgrates/cgrates/utils.DerivedChargers", utils.DerivedChargers{})
|
||||
gob.Register(engine.AliasValues{})
|
||||
}
|
||||
|
||||
func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled *bool, exitChan chan bool) {
|
||||
bal := balancer2go.NewBalancer()
|
||||
go stopBalancerSignalHandler(bal, exitChan)
|
||||
@@ -40,7 +53,6 @@ func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled
|
||||
}
|
||||
|
||||
// Starts rater and reports on chan
|
||||
|
||||
func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneChan chan struct{}, internalBalancerChan chan *balancer2go.Balancer, internalSchedulerChan chan *scheduler.Scheduler,
|
||||
internalCdrStatSChan chan rpcclient.RpcClientConnection, internalHistorySChan chan rpcclient.RpcClientConnection,
|
||||
internalPubSubSChan chan rpcclient.RpcClientConnection, internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection,
|
||||
@@ -54,15 +66,47 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
|
||||
waitTasks = append(waitTasks, cacheTaskChan)
|
||||
go func() {
|
||||
defer close(cacheTaskChan)
|
||||
if err := ratingDb.CacheRatingAll(); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
|
||||
loadHist, err := accountDb.GetLoadHistory(1, true)
|
||||
if err != nil || len(loadHist) == 0 {
|
||||
utils.Logger.Info(fmt.Sprintf("could not get load history: %v (%v)", loadHist, err))
|
||||
cacheDoneChan <- struct{}{}
|
||||
return
|
||||
}
|
||||
if err := accountDb.CacheAccountingPrefixes(); err != nil { // Used to cache load history
|
||||
utils.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
|
||||
start := time.Now()
|
||||
cfi, err := utils.LoadCacheFileInfo("/tmp/cgr_cache")
|
||||
if err != nil || cfi.LoadInfo.LoadId != loadHist[0].LoadId {
|
||||
if err := ratingDb.CacheRatingAll(); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
/*if err := accountDb.CacheAccountingPrefixes(); err != nil { // Used to cache load history
|
||||
utils.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}*/
|
||||
|
||||
utils.Logger.Info(fmt.Sprintf("Cache rating creation time: %v", time.Since(start)))
|
||||
|
||||
start = time.Now()
|
||||
if err := utils.SaveCacheFileInfo("/tmp/cgr_cache", &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist[0]}); err != nil {
|
||||
utils.Logger.Crit("could not write cache info file: " + err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if err := cache2go.Save("/tmp/cgr_cache", []string{utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.LCR_PREFIX, utils.DERIVEDCHARGERS_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.SHARED_GROUP_PREFIX}); err != nil {
|
||||
utils.Logger.Emerg(fmt.Sprintf("could not save cache file: " + err.Error()))
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("Cache rating save time: %v", time.Since(start)))
|
||||
} else {
|
||||
if err := cache2go.Load("/tmp/cgr_cache", []string{utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.LCR_PREFIX, utils.DERIVEDCHARGERS_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.SHARED_GROUP_PREFIX}); err != nil {
|
||||
utils.Logger.Crit("could not load cache file: " + err.Error())
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("Cache rating load time: %v", time.Since(start)))
|
||||
}
|
||||
cacheDoneChan <- struct{}{}
|
||||
}()
|
||||
|
||||
@@ -18,10 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
package console
|
||||
|
||||
import (
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
import "github.com/cgrates/cgrates/utils"
|
||||
|
||||
func init() {
|
||||
c := &CmdGetLoadHistory{
|
||||
@@ -61,6 +58,6 @@ func (self *CmdGetLoadHistory) PostprocessRpcParams() error {
|
||||
}
|
||||
|
||||
func (self *CmdGetLoadHistory) RpcResult() interface{} {
|
||||
a := make([]*engine.LoadInstance, 0)
|
||||
a := make([]*utils.LoadInstance, 0)
|
||||
return &a
|
||||
}
|
||||
|
||||
@@ -110,8 +110,8 @@ func getActionFunc(typ string) (actionTypeFunc, bool) {
|
||||
DEBIT_RESET: debitResetAction,
|
||||
DEBIT: debitAction,
|
||||
RESET_COUNTERS: resetCountersAction,
|
||||
ENABLE_ACCOUNT: enableUserAction,
|
||||
DISABLE_ACCOUNT: disableUserAction,
|
||||
ENABLE_ACCOUNT: enableAccountAction,
|
||||
DISABLE_ACCOUNT: disableAccountAction,
|
||||
//case ENABLE_DISABLE_BALANCE:
|
||||
// return enableDisableBalanceAction, true
|
||||
CALL_URL: callUrl,
|
||||
@@ -376,19 +376,19 @@ func genericDebit(ub *Account, a *Action, reset bool) (err error) {
|
||||
return ub.debitBalanceAction(a, reset)
|
||||
}
|
||||
|
||||
func enableUserAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
|
||||
if ub == nil {
|
||||
func enableAccountAction(acc *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
|
||||
if acc == nil {
|
||||
return errors.New("nil account")
|
||||
}
|
||||
ub.Disabled = false
|
||||
acc.Disabled = false
|
||||
return
|
||||
}
|
||||
|
||||
func disableUserAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
|
||||
if ub == nil {
|
||||
func disableAccountAction(acc *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
|
||||
if acc == nil {
|
||||
return errors.New("nil account")
|
||||
}
|
||||
ub.Disabled = true
|
||||
acc.Disabled = true
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"encoding/gob"
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/ugorji/go/codec"
|
||||
@@ -92,8 +91,8 @@ type AccountingStorage interface {
|
||||
SetAlias(*Alias) error
|
||||
GetAlias(string, bool) (*Alias, error)
|
||||
RemoveAlias(string) error
|
||||
GetLoadHistory(int, bool) ([]*LoadInstance, error)
|
||||
AddLoadHistory(*LoadInstance, int) error
|
||||
GetLoadHistory(int, bool) ([]*utils.LoadInstance, error)
|
||||
AddLoadHistory(*utils.LoadInstance, int) error
|
||||
GetStructVersion() (*StructVersion, error)
|
||||
SetStructVersion(*StructVersion) error
|
||||
}
|
||||
@@ -252,9 +251,3 @@ func (gm *GOBMarshaler) Marshal(v interface{}) (data []byte, err error) {
|
||||
func (gm *GOBMarshaler) Unmarshal(data []byte, v interface{}) error {
|
||||
return gob.NewDecoder(bytes.NewBuffer(data)).Decode(v)
|
||||
}
|
||||
|
||||
type LoadInstance struct {
|
||||
LoadId string // Unique identifier for the load
|
||||
TariffPlanId string // Tariff plan identificator for the data loaded
|
||||
LoadTime time.Time // Time of load
|
||||
}
|
||||
|
||||
@@ -696,13 +696,13 @@ func (ms *MapStorage) RemoveAlias(key string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetLoadHistory(limitItems int, skipCache bool) ([]*LoadInstance, error) {
|
||||
func (ms *MapStorage) GetLoadHistory(limitItems int, skipCache bool) ([]*utils.LoadInstance, error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (ms *MapStorage) AddLoadHistory(*LoadInstance, int) error {
|
||||
func (ms *MapStorage) AddLoadHistory(*utils.LoadInstance, int) error {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
return nil
|
||||
|
||||
@@ -1214,7 +1214,7 @@ func (ms *MongoStorage) RemoveAlias(key string) (err error) {
|
||||
}
|
||||
|
||||
// Limit will only retrieve the last n items out of history, newest first
|
||||
func (ms *MongoStorage) GetLoadHistory(limit int, skipCache bool) (loadInsts []*LoadInstance, err error) {
|
||||
func (ms *MongoStorage) GetLoadHistory(limit int, skipCache bool) (loadInsts []*utils.LoadInstance, err error) {
|
||||
if limit == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -1222,7 +1222,7 @@ func (ms *MongoStorage) GetLoadHistory(limit int, skipCache bool) (loadInsts []*
|
||||
if x, err := cache2go.Get(utils.LOADINST_KEY); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
items := x.([]*LoadInstance)
|
||||
items := x.([]*utils.LoadInstance)
|
||||
if len(items) < limit || limit == -1 {
|
||||
return items, nil
|
||||
}
|
||||
@@ -1231,7 +1231,7 @@ func (ms *MongoStorage) GetLoadHistory(limit int, skipCache bool) (loadInsts []*
|
||||
}
|
||||
var kv struct {
|
||||
Key string
|
||||
Value []*LoadInstance
|
||||
Value []*utils.LoadInstance
|
||||
}
|
||||
session, col := ms.conn(colLht)
|
||||
defer session.Close()
|
||||
@@ -1245,15 +1245,15 @@ func (ms *MongoStorage) GetLoadHistory(limit int, skipCache bool) (loadInsts []*
|
||||
}
|
||||
|
||||
// Adds a single load instance to load history
|
||||
func (ms *MongoStorage) AddLoadHistory(ldInst *LoadInstance, loadHistSize int) error {
|
||||
func (ms *MongoStorage) AddLoadHistory(ldInst *utils.LoadInstance, loadHistSize int) error {
|
||||
if loadHistSize == 0 { // Load history disabled
|
||||
return nil
|
||||
}
|
||||
// get existing load history
|
||||
var existingLoadHistory []*LoadInstance
|
||||
var existingLoadHistory []*utils.LoadInstance
|
||||
var kv struct {
|
||||
Key string
|
||||
Value []*LoadInstance
|
||||
Value []*utils.LoadInstance
|
||||
}
|
||||
session, col := ms.conn(colLht)
|
||||
defer session.Close()
|
||||
@@ -1282,7 +1282,7 @@ func (ms *MongoStorage) AddLoadHistory(ldInst *LoadInstance, loadHistSize int) e
|
||||
defer session.Close()
|
||||
_, err = col.Upsert(bson.M{"key": utils.LOADINST_KEY}, &struct {
|
||||
Key string
|
||||
Value []*LoadInstance
|
||||
Value []*utils.LoadInstance
|
||||
}{Key: utils.LOADINST_KEY, Value: existingLoadHistory})
|
||||
return nil, err
|
||||
}, 0, utils.LOADINST_KEY)
|
||||
|
||||
@@ -865,7 +865,7 @@ func (rs *RedisStorage) RemoveAlias(key string) (err error) {
|
||||
}
|
||||
|
||||
// Limit will only retrieve the last n items out of history, newest first
|
||||
func (rs *RedisStorage) GetLoadHistory(limit int, skipCache bool) ([]*LoadInstance, error) {
|
||||
func (rs *RedisStorage) GetLoadHistory(limit int, skipCache bool) ([]*utils.LoadInstance, error) {
|
||||
if limit == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -873,7 +873,7 @@ func (rs *RedisStorage) GetLoadHistory(limit int, skipCache bool) ([]*LoadInstan
|
||||
if x, err := cache2go.Get(utils.LOADINST_KEY); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
items := x.([]*LoadInstance)
|
||||
items := x.([]*utils.LoadInstance)
|
||||
if len(items) < limit || limit == -1 {
|
||||
return items, nil
|
||||
}
|
||||
@@ -887,9 +887,9 @@ func (rs *RedisStorage) GetLoadHistory(limit int, skipCache bool) ([]*LoadInstan
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
loadInsts := make([]*LoadInstance, len(marshaleds))
|
||||
loadInsts := make([]*utils.LoadInstance, len(marshaleds))
|
||||
for idx, marshaled := range marshaleds {
|
||||
var lInst LoadInstance
|
||||
var lInst utils.LoadInstance
|
||||
err = rs.ms.Unmarshal(marshaled, &lInst)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -902,7 +902,7 @@ func (rs *RedisStorage) GetLoadHistory(limit int, skipCache bool) ([]*LoadInstan
|
||||
}
|
||||
|
||||
// Adds a single load instance to load history
|
||||
func (rs *RedisStorage) AddLoadHistory(ldInst *LoadInstance, loadHistSize int) error {
|
||||
func (rs *RedisStorage) AddLoadHistory(ldInst *utils.LoadInstance, loadHistSize int) error {
|
||||
conn, err := rs.db.Get()
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -37,7 +37,7 @@ type TpReader struct {
|
||||
cdrStats map[string]*CdrStats
|
||||
users map[string]*UserProfile
|
||||
aliases map[string]*Alias
|
||||
loadInstance *LoadInstance
|
||||
loadInstance *utils.LoadInstance
|
||||
}
|
||||
|
||||
func NewTpReader(rs RatingStorage, as AccountingStorage, lr LoadReader, tpid, timezone string, loadHistSize int) *TpReader {
|
||||
@@ -1617,9 +1617,9 @@ func (tpr *TpReader) IsValid() bool {
|
||||
return valid
|
||||
}
|
||||
|
||||
func (tpr *TpReader) GetLoadInstance() *LoadInstance {
|
||||
func (tpr *TpReader) GetLoadInstance() *utils.LoadInstance {
|
||||
if tpr.loadInstance == nil {
|
||||
tpr.loadInstance = &LoadInstance{LoadId: utils.GenUUID(), TariffPlanId: tpr.tpid, LoadTime: time.Now()}
|
||||
tpr.loadInstance = &utils.LoadInstance{LoadId: utils.GenUUID(), TariffPlanId: tpr.tpid, LoadTime: time.Now()}
|
||||
}
|
||||
return tpr.loadInstance
|
||||
}
|
||||
|
||||
69
utils/cache_file_info.go
Normal file
69
utils/cache_file_info.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
)
|
||||
|
||||
type LoadInstance struct {
|
||||
LoadId string // Unique identifier for the load
|
||||
TariffPlanId string // Tariff plan identificator for the data loaded
|
||||
LoadTime time.Time // Time of load
|
||||
}
|
||||
|
||||
type CacheFileInfo struct {
|
||||
Encoding string
|
||||
LoadInfo *LoadInstance
|
||||
}
|
||||
|
||||
func LoadCacheFileInfo(path string) (*CacheFileInfo, error) {
|
||||
// open data file
|
||||
dataFile, err := os.Open(filepath.Join(path, "cache.info"))
|
||||
defer dataFile.Close()
|
||||
if err != nil {
|
||||
Logger.Err("<cache decoder>: " + err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
filesInfo := &CacheFileInfo{}
|
||||
dataDecoder := json.NewDecoder(dataFile)
|
||||
err = dataDecoder.Decode(filesInfo)
|
||||
if err != nil {
|
||||
Logger.Err("<cache decoder>: " + err.Error())
|
||||
return nil, err
|
||||
}
|
||||
return filesInfo, nil
|
||||
}
|
||||
|
||||
func SaveCacheFileInfo(path string, cfi *CacheFileInfo) error {
|
||||
// open data file
|
||||
// create a the path
|
||||
if err := os.MkdirAll(path, 0766); err != nil {
|
||||
Logger.Err("<cache encoder>:" + err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
dataFile, err := os.Create(filepath.Join(path, "cache.info"))
|
||||
defer dataFile.Close()
|
||||
if err != nil {
|
||||
Logger.Err("<cache encoder>:" + err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// serialize the data
|
||||
dataEncoder := json.NewEncoder(dataFile)
|
||||
if err := dataEncoder.Encode(cfi); err != nil {
|
||||
Logger.Err("<cache encoder>:" + err.Error())
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func CacheFileExists(filePath string) bool {
|
||||
if _, err := os.Stat(filePath); err == nil {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
Reference in New Issue
Block a user