diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 1c6162ffe..b9932fd3e 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -1048,7 +1048,7 @@ func (self *ApierV1) RemoveRatingProfile(attr AttrRemoveRatingProfile, reply *st return nil } -func (self *ApierV1) GetLoadHistory(attrs utils.Paginator, reply *[]*engine.LoadInstance) error { +func (self *ApierV1) GetLoadHistory(attrs utils.Paginator, reply *[]*utils.LoadInstance) error { nrItems := -1 offset := 0 if attrs.Offset != nil { // For offset we need full data diff --git a/apier/v1/smgenericv1_it_test.go b/apier/v1/smgenericv1_it_test.go index 17da136f8..e5af07e72 100644 --- a/apier/v1/smgenericv1_it_test.go +++ b/apier/v1/smgenericv1_it_test.go @@ -36,7 +36,7 @@ import ( var smgV1CfgPath string var smgV1Cfg *config.CGRConfig var smgV1Rpc *rpc.Client -var smgV1LoadInst engine.LoadInstance // Share load information between tests +var smgV1LoadInst utils.LoadInstance // Share load information between tests func TestSMGV1InitCfg(t *testing.T) { if !*testLocal { diff --git a/apier/v2/apier.go b/apier/v2/apier.go index 6d25140b6..8ece3d181 100644 --- a/apier/v2/apier.go +++ b/apier/v2/apier.go @@ -127,7 +127,7 @@ func (self *ApierV2) LoadDerivedChargers(attrs AttrLoadDerivedChargers, reply *s return nil } -func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, reply *engine.LoadInstance) error { +func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, reply *utils.LoadInstance) error { if len(attrs.FolderPath) == 0 { return fmt.Errorf("%s:%s", utils.ErrMandatoryIeMissing.Error(), "FolderPath") } @@ -161,7 +161,7 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, return utils.NewErrServerError(err) } if attrs.DryRun { - *reply = engine.LoadInstance{LoadId: utils.DRYRUN} + *reply = utils.LoadInstance{LoadId: utils.DRYRUN} return nil // Mission complete, no errors } diff --git a/apier/v2/cdrs_mongo_local_test.go b/apier/v2/cdrs_mongo_local_test.go index 14e0d2e9f..15446ab83 100644 --- a/apier/v2/cdrs_mongo_local_test.go +++ b/apier/v2/cdrs_mongo_local_test.go @@ -272,7 +272,7 @@ func TestV2CdrsMongoLoadTariffPlanFromFolder(t *testing.T) { if !*testLocal { return } - var loadInst engine.LoadInstance + var loadInst utils.LoadInstance attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")} if err := cdrsMongoRpc.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil { t.Error(err) diff --git a/apier/v2/cdrs_mysql_local_test.go b/apier/v2/cdrs_mysql_local_test.go index 60fddfc70..4978b280c 100644 --- a/apier/v2/cdrs_mysql_local_test.go +++ b/apier/v2/cdrs_mysql_local_test.go @@ -274,7 +274,7 @@ func TestV2CDRsMySQLLoadTariffPlanFromFolder(t *testing.T) { if !*testLocal { return } - var loadInst engine.LoadInstance + var loadInst utils.LoadInstance attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")} if err := cdrsRpc.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil { t.Error(err) diff --git a/apier/v2/cdrs_psql_local_test.go b/apier/v2/cdrs_psql_local_test.go index 03d387c01..30fa12782 100644 --- a/apier/v2/cdrs_psql_local_test.go +++ b/apier/v2/cdrs_psql_local_test.go @@ -270,7 +270,7 @@ func TestV2CDRsPSQLLoadTariffPlanFromFolder(t *testing.T) { if !*testLocal { return } - var loadInst engine.LoadInstance + var loadInst utils.LoadInstance attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")} if err := cdrsPsqlRpc.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil { t.Error(err) diff --git a/cache2go/cache.go b/cache2go/cache.go index 6fcc6d8d1..0a2500ff8 100644 --- a/cache2go/cache.go +++ b/cache2go/cache.go @@ -13,6 +13,16 @@ const ( DOUBLE_CACHE = true ) +var ( + mux sync.RWMutex + cache cacheStore + // transaction stuff + transactionBuffer []*transactionItem + transactionMux sync.Mutex + transactionON = false + transactionLock = false +) + type transactionItem struct { key string value interface{} @@ -27,16 +37,6 @@ func init() { } } -var ( - mux sync.RWMutex - cache cacheStore - // transaction stuff - transactionBuffer []*transactionItem - transactionMux sync.Mutex - transactionON = false - transactionLock = false -) - func BeginTransaction() { transactionMux.Lock() transactionLock = true @@ -74,6 +74,22 @@ func CommitTransaction() { transactionMux.Unlock() } +func Save(path string, keys []string) error { + if !transactionLock { + mux.Lock() + defer mux.Unlock() + } + return cache.Save(path, keys) +} + +func Load(path string, keys []string) error { + if !transactionLock { + mux.Lock() + defer mux.Unlock() + } + return cache.Load(path, keys) +} + // The function to be used to cache a key/value pair when expiration is not needed func Cache(key string, value interface{}) { if !transactionLock { diff --git a/cache2go/store.go b/cache2go/store.go index 572d02e4b..d39601b1a 100644 --- a/cache2go/store.go +++ b/cache2go/store.go @@ -2,7 +2,11 @@ package cache2go import ( + "encoding/gob" + "os" + "path/filepath" "strings" + "sync" "github.com/cgrates/cgrates/utils" ) @@ -17,6 +21,8 @@ type cacheStore interface { CountEntriesForPrefix(string) int GetAllForPrefix(string) (map[string]interface{}, error) GetKeysForPrefix(string) []string + Save(string, []string) error + Load(string, []string) error } // easy to be counted exported by prefix @@ -108,6 +114,69 @@ func (cs cacheDoubleStore) GetKeysForPrefix(prefix string) (keys []string) { return } +func (cs cacheDoubleStore) Save(path string, keys []string) error { + // create a the path + if err := os.MkdirAll(path, 0766); err != nil { + utils.Logger.Err(":" + err.Error()) + return err + } + + var wg sync.WaitGroup + for _, key := range keys { + key = key[:PREFIX_LEN] + value, found := cs[key] + if !found { + continue + } + wg.Add(1) + go func(fileName string, data map[string]interface{}) { + defer wg.Done() + dataFile, err := os.Create(filepath.Join(path, fileName) + ".cache") + defer dataFile.Close() + if err != nil { + utils.Logger.Err(":" + err.Error()) + } + + // serialize the data + dataEncoder := gob.NewEncoder(dataFile) + if err := dataEncoder.Encode(data); err != nil { + utils.Logger.Err(":" + err.Error()) + } + }(key, value) + } + wg.Wait() + return nil +} + +func (cs cacheDoubleStore) Load(path string, keys []string) error { + var wg sync.WaitGroup + for _, key := range keys { + key = key[:PREFIX_LEN] // make sure it's only limited to prefix length' + file := filepath.Join(path, key+".cache") + wg.Add(1) + go func(fileName, key string) { + defer wg.Done() + + // open data file + dataFile, err := os.Open(fileName) + defer dataFile.Close() + if err != nil { + utils.Logger.Err(": " + err.Error()) + } + + val := make(map[string]interface{}) + dataDecoder := gob.NewDecoder(dataFile) + err = dataDecoder.Decode(&val) + if err != nil { + utils.Logger.Err(": " + err.Error()) + } + cs[key] = val + }(file, key) + } + wg.Wait() + return nil +} + // faster to access type cacheSimpleStore struct { cache map[string]interface{} @@ -232,3 +301,13 @@ func (cs cacheSimpleStore) GetKeysForPrefix(prefix string) (keys []string) { } return } + +func (cs cacheSimpleStore) Save(path string, keys []string) error { + utils.Logger.Info("simplestore save") + return nil +} + +func (cs cacheSimpleStore) Load(path string, keys []string) error { + utils.Logger.Info("simplestore load") + return nil +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 8c0f62af0..07253dba0 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -22,6 +22,7 @@ import ( "flag" "fmt" "log" + "path/filepath" // _ "net/http/pprof" "os" "runtime" @@ -33,6 +34,7 @@ import ( "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/apier/v2" "github.com/cgrates/cgrates/balancer2go" + "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/cdrc" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -394,11 +396,41 @@ func startPubSubServer(internalPubSubSChan chan rpcclient.RpcClientConnection, a func startAliasesServer(internalAliaseSChan chan rpcclient.RpcClientConnection, accountDb engine.AccountingStorage, server *utils.Server, exitChan chan bool) { aliasesServer := engine.NewAliasHandler(accountDb) server.RpcRegisterName("AliasesV1", aliasesServer) - if err := accountDb.CacheAccountingPrefixes(utils.ALIASES_PREFIX); err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) - exitChan <- true + loadHist, err := accountDb.GetLoadHistory(1, true) + if err != nil || len(loadHist) == 0 { + utils.Logger.Info(fmt.Sprintf("could not get load history: %v (%v)", loadHist, err)) + internalAliaseSChan <- aliasesServer return } + + start := time.Now() + cfi, err := utils.LoadCacheFileInfo("/tmp/cgr_cache") + if err != nil || cfi.LoadInfo.LoadId != loadHist[0].LoadId || !utils.CacheFileExists(filepath.Join("/tmp/cgr_cache", utils.ALIASES_PREFIX+".cache")) { + if err := accountDb.CacheAccountingPrefixes(utils.ALIASES_PREFIX); err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) + exitChan <- true + return + } + utils.Logger.Info(fmt.Sprintf("Cache accounting creation time: %v", time.Since(start))) + + start = time.Now() + if err := utils.SaveCacheFileInfo("/tmp/cgr_cache", &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist[0]}); err != nil { + utils.Logger.Crit("could not write cache info file: " + err.Error()) + return + } + + if err := cache2go.Save("/tmp/cgr_cache", []string{utils.ALIASES_PREFIX}); err != nil { + utils.Logger.Emerg(fmt.Sprintf("could not save cache file: " + err.Error())) + } + utils.Logger.Info(fmt.Sprintf("Cache accounting save time: %v", time.Since(start))) + } else { + if err := cache2go.Load("/tmp/cgr_cache", []string{utils.ALIASES_PREFIX}); err != nil { + utils.Logger.Crit("could not load cache file: " + err.Error()) + exitChan <- true + return + } + utils.Logger.Info(fmt.Sprintf("Cache accounting load time: %v", time.Since(start))) + } internalAliaseSChan <- aliasesServer } diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index e15a413a0..f121b3fc9 100644 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -19,12 +19,14 @@ along with this program. If not, see package main import ( + "encoding/gob" "fmt" "time" "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/apier/v2" "github.com/cgrates/cgrates/balancer2go" + "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/history" "github.com/cgrates/cgrates/scheduler" @@ -32,6 +34,17 @@ import ( "github.com/cgrates/rpcclient" ) +func init() { + gob.Register(map[interface{}]struct{}{}) + gob.Register(engine.Actions{}) + gob.RegisterName("github.com/cgrates/cgrates/engine.ActionPlan", &engine.ActionPlan{}) + gob.Register([]*utils.LoadInstance{}) + gob.RegisterName("github.com/cgrates/cgrates/engine.RatingPlan", engine.RatingPlan{}) + gob.RegisterName("github.com/cgrates/cgrates/engine.RatingProfile", engine.RatingProfile{}) + gob.RegisterName("github.com/cgrates/cgrates/utils.DerivedChargers", utils.DerivedChargers{}) + gob.Register(engine.AliasValues{}) +} + func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled *bool, exitChan chan bool) { bal := balancer2go.NewBalancer() go stopBalancerSignalHandler(bal, exitChan) @@ -40,7 +53,6 @@ func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled } // Starts rater and reports on chan - func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneChan chan struct{}, internalBalancerChan chan *balancer2go.Balancer, internalSchedulerChan chan *scheduler.Scheduler, internalCdrStatSChan chan rpcclient.RpcClientConnection, internalHistorySChan chan rpcclient.RpcClientConnection, internalPubSubSChan chan rpcclient.RpcClientConnection, internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection, @@ -54,15 +66,47 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, cacheTaskChan) go func() { defer close(cacheTaskChan) - if err := ratingDb.CacheRatingAll(); err != nil { - utils.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error())) - exitChan <- true + + loadHist, err := accountDb.GetLoadHistory(1, true) + if err != nil || len(loadHist) == 0 { + utils.Logger.Info(fmt.Sprintf("could not get load history: %v (%v)", loadHist, err)) + cacheDoneChan <- struct{}{} return } - if err := accountDb.CacheAccountingPrefixes(); err != nil { // Used to cache load history - utils.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error())) - exitChan <- true - return + + start := time.Now() + cfi, err := utils.LoadCacheFileInfo("/tmp/cgr_cache") + if err != nil || cfi.LoadInfo.LoadId != loadHist[0].LoadId { + if err := ratingDb.CacheRatingAll(); err != nil { + utils.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error())) + exitChan <- true + return + } + /*if err := accountDb.CacheAccountingPrefixes(); err != nil { // Used to cache load history + utils.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error())) + exitChan <- true + return + }*/ + + utils.Logger.Info(fmt.Sprintf("Cache rating creation time: %v", time.Since(start))) + + start = time.Now() + if err := utils.SaveCacheFileInfo("/tmp/cgr_cache", &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist[0]}); err != nil { + utils.Logger.Crit("could not write cache info file: " + err.Error()) + return + } + + if err := cache2go.Save("/tmp/cgr_cache", []string{utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.LCR_PREFIX, utils.DERIVEDCHARGERS_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.SHARED_GROUP_PREFIX}); err != nil { + utils.Logger.Emerg(fmt.Sprintf("could not save cache file: " + err.Error())) + } + utils.Logger.Info(fmt.Sprintf("Cache rating save time: %v", time.Since(start))) + } else { + if err := cache2go.Load("/tmp/cgr_cache", []string{utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.LCR_PREFIX, utils.DERIVEDCHARGERS_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.SHARED_GROUP_PREFIX}); err != nil { + utils.Logger.Crit("could not load cache file: " + err.Error()) + exitChan <- true + return + } + utils.Logger.Info(fmt.Sprintf("Cache rating load time: %v", time.Since(start))) } cacheDoneChan <- struct{}{} }() diff --git a/console/load_history.go b/console/load_history.go index fc4976059..60e88bc2f 100644 --- a/console/load_history.go +++ b/console/load_history.go @@ -18,10 +18,7 @@ along with this program. If not, see package console -import ( - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) +import "github.com/cgrates/cgrates/utils" func init() { c := &CmdGetLoadHistory{ @@ -61,6 +58,6 @@ func (self *CmdGetLoadHistory) PostprocessRpcParams() error { } func (self *CmdGetLoadHistory) RpcResult() interface{} { - a := make([]*engine.LoadInstance, 0) + a := make([]*utils.LoadInstance, 0) return &a } diff --git a/engine/action.go b/engine/action.go index 03d81e289..9d88a0e4c 100644 --- a/engine/action.go +++ b/engine/action.go @@ -110,8 +110,8 @@ func getActionFunc(typ string) (actionTypeFunc, bool) { DEBIT_RESET: debitResetAction, DEBIT: debitAction, RESET_COUNTERS: resetCountersAction, - ENABLE_ACCOUNT: enableUserAction, - DISABLE_ACCOUNT: disableUserAction, + ENABLE_ACCOUNT: enableAccountAction, + DISABLE_ACCOUNT: disableAccountAction, //case ENABLE_DISABLE_BALANCE: // return enableDisableBalanceAction, true CALL_URL: callUrl, @@ -376,19 +376,19 @@ func genericDebit(ub *Account, a *Action, reset bool) (err error) { return ub.debitBalanceAction(a, reset) } -func enableUserAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) { - if ub == nil { +func enableAccountAction(acc *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) { + if acc == nil { return errors.New("nil account") } - ub.Disabled = false + acc.Disabled = false return } -func disableUserAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) { - if ub == nil { +func disableAccountAction(acc *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) { + if acc == nil { return errors.New("nil account") } - ub.Disabled = true + acc.Disabled = true return } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 7d88886b0..c18d3a4eb 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -23,7 +23,6 @@ import ( "encoding/gob" "encoding/json" "reflect" - "time" "github.com/cgrates/cgrates/utils" "github.com/ugorji/go/codec" @@ -92,8 +91,8 @@ type AccountingStorage interface { SetAlias(*Alias) error GetAlias(string, bool) (*Alias, error) RemoveAlias(string) error - GetLoadHistory(int, bool) ([]*LoadInstance, error) - AddLoadHistory(*LoadInstance, int) error + GetLoadHistory(int, bool) ([]*utils.LoadInstance, error) + AddLoadHistory(*utils.LoadInstance, int) error GetStructVersion() (*StructVersion, error) SetStructVersion(*StructVersion) error } @@ -252,9 +251,3 @@ func (gm *GOBMarshaler) Marshal(v interface{}) (data []byte, err error) { func (gm *GOBMarshaler) Unmarshal(data []byte, v interface{}) error { return gob.NewDecoder(bytes.NewBuffer(data)).Decode(v) } - -type LoadInstance struct { - LoadId string // Unique identifier for the load - TariffPlanId string // Tariff plan identificator for the data loaded - LoadTime time.Time // Time of load -} diff --git a/engine/storage_map.go b/engine/storage_map.go index dfd961abc..be821d54d 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -696,13 +696,13 @@ func (ms *MapStorage) RemoveAlias(key string) error { return nil } -func (ms *MapStorage) GetLoadHistory(limitItems int, skipCache bool) ([]*LoadInstance, error) { +func (ms *MapStorage) GetLoadHistory(limitItems int, skipCache bool) ([]*utils.LoadInstance, error) { ms.mu.RLock() defer ms.mu.RUnlock() return nil, nil } -func (ms *MapStorage) AddLoadHistory(*LoadInstance, int) error { +func (ms *MapStorage) AddLoadHistory(*utils.LoadInstance, int) error { ms.mu.Lock() defer ms.mu.Unlock() return nil diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 1879c3b1e..7437e307b 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1214,7 +1214,7 @@ func (ms *MongoStorage) RemoveAlias(key string) (err error) { } // Limit will only retrieve the last n items out of history, newest first -func (ms *MongoStorage) GetLoadHistory(limit int, skipCache bool) (loadInsts []*LoadInstance, err error) { +func (ms *MongoStorage) GetLoadHistory(limit int, skipCache bool) (loadInsts []*utils.LoadInstance, err error) { if limit == 0 { return nil, nil } @@ -1222,7 +1222,7 @@ func (ms *MongoStorage) GetLoadHistory(limit int, skipCache bool) (loadInsts []* if x, err := cache2go.Get(utils.LOADINST_KEY); err != nil { return nil, err } else { - items := x.([]*LoadInstance) + items := x.([]*utils.LoadInstance) if len(items) < limit || limit == -1 { return items, nil } @@ -1231,7 +1231,7 @@ func (ms *MongoStorage) GetLoadHistory(limit int, skipCache bool) (loadInsts []* } var kv struct { Key string - Value []*LoadInstance + Value []*utils.LoadInstance } session, col := ms.conn(colLht) defer session.Close() @@ -1245,15 +1245,15 @@ func (ms *MongoStorage) GetLoadHistory(limit int, skipCache bool) (loadInsts []* } // Adds a single load instance to load history -func (ms *MongoStorage) AddLoadHistory(ldInst *LoadInstance, loadHistSize int) error { +func (ms *MongoStorage) AddLoadHistory(ldInst *utils.LoadInstance, loadHistSize int) error { if loadHistSize == 0 { // Load history disabled return nil } // get existing load history - var existingLoadHistory []*LoadInstance + var existingLoadHistory []*utils.LoadInstance var kv struct { Key string - Value []*LoadInstance + Value []*utils.LoadInstance } session, col := ms.conn(colLht) defer session.Close() @@ -1282,7 +1282,7 @@ func (ms *MongoStorage) AddLoadHistory(ldInst *LoadInstance, loadHistSize int) e defer session.Close() _, err = col.Upsert(bson.M{"key": utils.LOADINST_KEY}, &struct { Key string - Value []*LoadInstance + Value []*utils.LoadInstance }{Key: utils.LOADINST_KEY, Value: existingLoadHistory}) return nil, err }, 0, utils.LOADINST_KEY) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 201a4585a..e82db4c01 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -865,7 +865,7 @@ func (rs *RedisStorage) RemoveAlias(key string) (err error) { } // Limit will only retrieve the last n items out of history, newest first -func (rs *RedisStorage) GetLoadHistory(limit int, skipCache bool) ([]*LoadInstance, error) { +func (rs *RedisStorage) GetLoadHistory(limit int, skipCache bool) ([]*utils.LoadInstance, error) { if limit == 0 { return nil, nil } @@ -873,7 +873,7 @@ func (rs *RedisStorage) GetLoadHistory(limit int, skipCache bool) ([]*LoadInstan if x, err := cache2go.Get(utils.LOADINST_KEY); err != nil { return nil, err } else { - items := x.([]*LoadInstance) + items := x.([]*utils.LoadInstance) if len(items) < limit || limit == -1 { return items, nil } @@ -887,9 +887,9 @@ func (rs *RedisStorage) GetLoadHistory(limit int, skipCache bool) ([]*LoadInstan if err != nil { return nil, err } - loadInsts := make([]*LoadInstance, len(marshaleds)) + loadInsts := make([]*utils.LoadInstance, len(marshaleds)) for idx, marshaled := range marshaleds { - var lInst LoadInstance + var lInst utils.LoadInstance err = rs.ms.Unmarshal(marshaled, &lInst) if err != nil { return nil, err @@ -902,7 +902,7 @@ func (rs *RedisStorage) GetLoadHistory(limit int, skipCache bool) ([]*LoadInstan } // Adds a single load instance to load history -func (rs *RedisStorage) AddLoadHistory(ldInst *LoadInstance, loadHistSize int) error { +func (rs *RedisStorage) AddLoadHistory(ldInst *utils.LoadInstance, loadHistSize int) error { conn, err := rs.db.Get() if err != nil { return err diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 4eaf2e81f..1e3ed8ea9 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -37,7 +37,7 @@ type TpReader struct { cdrStats map[string]*CdrStats users map[string]*UserProfile aliases map[string]*Alias - loadInstance *LoadInstance + loadInstance *utils.LoadInstance } func NewTpReader(rs RatingStorage, as AccountingStorage, lr LoadReader, tpid, timezone string, loadHistSize int) *TpReader { @@ -1617,9 +1617,9 @@ func (tpr *TpReader) IsValid() bool { return valid } -func (tpr *TpReader) GetLoadInstance() *LoadInstance { +func (tpr *TpReader) GetLoadInstance() *utils.LoadInstance { if tpr.loadInstance == nil { - tpr.loadInstance = &LoadInstance{LoadId: utils.GenUUID(), TariffPlanId: tpr.tpid, LoadTime: time.Now()} + tpr.loadInstance = &utils.LoadInstance{LoadId: utils.GenUUID(), TariffPlanId: tpr.tpid, LoadTime: time.Now()} } return tpr.loadInstance } diff --git a/utils/cache_file_info.go b/utils/cache_file_info.go new file mode 100644 index 000000000..7b67cb888 --- /dev/null +++ b/utils/cache_file_info.go @@ -0,0 +1,69 @@ +package utils + +import ( + "encoding/json" + "os" + "path/filepath" + "time" +) + +type LoadInstance struct { + LoadId string // Unique identifier for the load + TariffPlanId string // Tariff plan identificator for the data loaded + LoadTime time.Time // Time of load +} + +type CacheFileInfo struct { + Encoding string + LoadInfo *LoadInstance +} + +func LoadCacheFileInfo(path string) (*CacheFileInfo, error) { + // open data file + dataFile, err := os.Open(filepath.Join(path, "cache.info")) + defer dataFile.Close() + if err != nil { + Logger.Err(": " + err.Error()) + return nil, err + } + + filesInfo := &CacheFileInfo{} + dataDecoder := json.NewDecoder(dataFile) + err = dataDecoder.Decode(filesInfo) + if err != nil { + Logger.Err(": " + err.Error()) + return nil, err + } + return filesInfo, nil +} + +func SaveCacheFileInfo(path string, cfi *CacheFileInfo) error { + // open data file + // create a the path + if err := os.MkdirAll(path, 0766); err != nil { + Logger.Err(":" + err.Error()) + return err + } + + dataFile, err := os.Create(filepath.Join(path, "cache.info")) + defer dataFile.Close() + if err != nil { + Logger.Err(":" + err.Error()) + return err + } + + // serialize the data + dataEncoder := json.NewEncoder(dataFile) + if err := dataEncoder.Encode(cfi); err != nil { + Logger.Err(":" + err.Error()) + return err + } + return nil +} + +func CacheFileExists(filePath string) bool { + if _, err := os.Stat(filePath); err == nil { + return true + } + return false +}