diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 4b56f3da2..00ab7e20a 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -21,7 +21,6 @@ package v1 import ( "errors" "fmt" - "log" "os" "path" "strconv" @@ -1024,7 +1023,6 @@ func (arrp *AttrRemoveRatingProfile) GetId() (result string) { } func (self *ApierV1) RemoveRatingProfile(attr AttrRemoveRatingProfile, reply *string) error { - log.Printf("ATTR: %+v", attr) if attr.Direction == "" { attr.Direction = utils.OUT } @@ -1034,7 +1032,6 @@ func (self *ApierV1) RemoveRatingProfile(attr AttrRemoveRatingProfile, reply *st return utils.ErrMandatoryIeMissing } _, err := engine.Guardian.Guard(func() (interface{}, error) { - log.Print("RPID: ", attr.GetId()) err := self.RatingDb.RemoveRatingProfile(attr.GetId()) if err != nil { return 0, err diff --git a/engine/cache.go b/engine/cache.go index 46f524a07..1f564e76d 100644 --- a/engine/cache.go +++ b/engine/cache.go @@ -1,11 +1,7 @@ //Simple caching library with expiration capabilities package engine -import ( - "sync" - - "github.com/cgrates/cgrates/utils" -) +import "sync" const ( PREFIX_LEN = 4 @@ -25,6 +21,7 @@ var ( transactionMux sync.Mutex transactionON = false transactionLock = false + dumper *cacheDumper ) type transactionItem struct { @@ -33,6 +30,13 @@ type transactionItem struct { kind string } +func CacheSetDumperPath(path string) (err error) { + if dumper == nil { + dumper, err = newCacheDumper(path) + } + return +} + func init() { if DOUBLE_CACHE { cache = newDoubleStore() @@ -78,12 +82,6 @@ func CacheCommitTransaction() { transactionMux.Unlock() } -func CacheSave(path string, keys map[string][]string, cfi *utils.CacheFileInfo) error { - mux.Lock() - defer mux.Unlock() - return cache.Save(path, keys, cfi) -} - func CacheLoad(path string, keys []string) error { if !transactionLock { mux.Lock() diff --git a/engine/cache_dumper.go b/engine/cache_dumper.go new file mode 100644 index 000000000..326ba8d3f --- /dev/null +++ b/engine/cache_dumper.go @@ -0,0 +1,148 @@ +package engine + +import ( + "bytes" + "compress/zlib" + "io/ioutil" + "os" + "path/filepath" + + "github.com/cgrates/cgrates/utils" + "github.com/syndtr/goleveldb/leveldb" +) + +type cacheDumper struct { + path string + dbMap map[string]*leveldb.DB + dataEncoder Marshaler +} + +func newCacheDumper(path string) (*cacheDumper, error) { + if path != "" { + if err := os.MkdirAll(path, 0766); err != nil { + return nil, err + } + } + return &cacheDumper{ + path: path, + dbMap: make(map[string]*leveldb.DB), + dataEncoder: NewCodecMsgpackMarshaler(), + }, nil +} + +func (cd *cacheDumper) getDumbDb(prefix string) (*leveldb.DB, error) { + if cd == nil || cd.path == "" { + return nil, nil + } + db, found := cd.dbMap[prefix] + if !found { + var err error + db, err = leveldb.OpenFile(filepath.Join(cd.path, prefix+".cache"), nil) + if err != nil { + return nil, err + } + cd.dbMap[prefix] = db + } + return db, nil +} + +func (cd *cacheDumper) put(prefix, key string, value interface{}) error { + db, err := cd.getDumbDb(prefix) + if err != nil || db == nil { + return err + } + encData, err := cd.dataEncoder.Marshal(value) + if err != nil { + return err + } + if len(encData) > 1000 { + var buf bytes.Buffer + w := zlib.NewWriter(&buf) + w.Write(encData) + w.Close() + encData = buf.Bytes() + } + return db.Put([]byte(key), encData, nil) +} + +func (cd *cacheDumper) load(prefix string) (map[string]interface{}, error) { + db, err := cd.getDumbDb(prefix) + if err != nil || db == nil { + return nil, err + } + val := make(map[string]interface{}) + iter := db.NewIterator(nil, nil) + for iter.Next() { + k := iter.Key() + data := iter.Value() + var encData []byte + if data[0] == 120 && data[1] == 156 { //zip header + x := bytes.NewBuffer(data) + r, err := zlib.NewReader(x) + if err != nil { + utils.Logger.Info(": " + err.Error()) + break + } + out, err := ioutil.ReadAll(r) + if err != nil { + utils.Logger.Info(": " + err.Error()) + break + } + r.Close() + encData = out + } else { + encData = data + } + v := cd.cacheTypeFactory(prefix) + if err := cd.dataEncoder.Unmarshal(encData, &v); err != nil { + return nil, err + } + val[string(k)] = v + } + iter.Release() + return val, nil +} + +func (cd *cacheDumper) delete(prefix, key string) error { + db, err := cd.getDumbDb(prefix) + if err != nil || db == nil { + return err + } + return db.Delete([]byte(key), nil) +} + +func (cd *cacheDumper) deleteAll(prefix string) error { + db, err := cd.getDumbDb(prefix) + if err != nil || db == nil { + return err + } + db.Close() + delete(cd.dbMap, prefix) + return os.RemoveAll(filepath.Join(cd.path, prefix+".cache")) +} + +func (cd *cacheDumper) cacheTypeFactory(prefix string) interface{} { + switch prefix { + case utils.DESTINATION_PREFIX: + return make(map[string]struct{}) + case utils.RATING_PLAN_PREFIX: + return &RatingPlan{} + case utils.RATING_PROFILE_PREFIX: + return &RatingProfile{} + case utils.LCR_PREFIX: + return &LCR{} + case utils.DERIVEDCHARGERS_PREFIX: + return &utils.DerivedChargers{} + case utils.ACTION_PREFIX: + return Actions{} + case utils.ACTION_PLAN_PREFIX: + return &ActionPlan{} + case utils.SHARED_GROUP_PREFIX: + return &SharedGroup{} + case utils.ALIASES_PREFIX: + return AliasValues{} + case utils.LOADINST_KEY[:PREFIX_LEN]: + return make([]*utils.LoadInstance, 0) + } + return nil +} diff --git a/engine/cache_factory.go b/engine/cache_factory.go deleted file mode 100644 index cb64892d4..000000000 --- a/engine/cache_factory.go +++ /dev/null @@ -1,29 +0,0 @@ -package engine - -import "github.com/cgrates/cgrates/utils" - -func CacheTypeFactory(prefix string) interface{} { - switch prefix { - case utils.DESTINATION_PREFIX: - return make(map[string]struct{}) - case utils.RATING_PLAN_PREFIX: - return &RatingPlan{} - case utils.RATING_PROFILE_PREFIX: - return &RatingProfile{} - case utils.LCR_PREFIX: - return &LCR{} - case utils.DERIVEDCHARGERS_PREFIX: - return &utils.DerivedChargers{} - case utils.ACTION_PREFIX: - return Actions{} - case utils.ACTION_PLAN_PREFIX: - return &ActionPlan{} - case utils.SHARED_GROUP_PREFIX: - return &SharedGroup{} - case utils.ALIASES_PREFIX: - return AliasValues{} - case utils.LOADINST_KEY[:PREFIX_LEN]: - return make([]*utils.LoadInstance, 0) - } - return nil -} diff --git a/engine/cache_store.go b/engine/cache_store.go index a01ae7480..e81eb7471 100644 --- a/engine/cache_store.go +++ b/engine/cache_store.go @@ -2,11 +2,7 @@ package engine import ( - "bytes" - "compress/zlib" "fmt" - "io/ioutil" - "log" "os" "path/filepath" "strings" @@ -14,7 +10,6 @@ import ( "time" "github.com/cgrates/cgrates/utils" - "github.com/syndtr/goleveldb/leveldb" ) type cacheStore interface { @@ -27,7 +22,6 @@ type cacheStore interface { CountEntriesForPrefix(string) int GetAllForPrefix(string) (map[string]interface{}, error) GetKeysForPrefix(string) []string - Save(string, map[string][]string, *utils.CacheFileInfo) error Load(string, []string) error } @@ -46,6 +40,9 @@ func (cs cacheDoubleStore) Put(key string, value interface{}) { cs[prefix] = mp } mp[key] = value + if err := dumper.put(prefix, key, value); err != nil { + utils.Logger.Info(" put error: " + err.Error()) + } } func (cs cacheDoubleStore) Get(key string) (interface{}, error) { @@ -87,11 +84,18 @@ func (cs cacheDoubleStore) Delete(key string) { prefix, key := key[:PREFIX_LEN], key[PREFIX_LEN:] if keyMap, ok := cs[prefix]; ok { delete(keyMap, key) + if err := dumper.delete(prefix, key); err != nil { + utils.Logger.Info(" delete error: " + err.Error()) + } + } } func (cs cacheDoubleStore) DeletePrefix(prefix string) { delete(cs, prefix) + if err := dumper.deleteAll(prefix); err != nil { + utils.Logger.Info(" delete all error: " + err.Error()) + } } func (cs cacheDoubleStore) CountEntriesForPrefix(prefix string) int { @@ -120,71 +124,6 @@ func (cs cacheDoubleStore) GetKeysForPrefix(prefix string) (keys []string) { return } -func (cs cacheDoubleStore) Save(path string, prefixKeysMap map[string][]string, cfi *utils.CacheFileInfo) error { - start := time.Now() - //log.Printf("path: %s prefixes: %v", path, prefixes) - if path == "" || len(prefixKeysMap) == 0 { - return nil - } - //log.Print("saving cache prefixes: ", prefixes) - // create a the path - if err := os.MkdirAll(path, 0766); err != nil { - utils.Logger.Info(":" + err.Error()) - return err - } - - var wg sync.WaitGroup - var prefixSlice []string - for prefix, keys := range prefixKeysMap { - prefix = prefix[:PREFIX_LEN] - mapValue, found := cs[prefix] - if !found { - continue - } - prefixSlice = append(prefixSlice, prefix) - wg.Add(1) - go func(pref string, ks []string, data map[string]interface{}) { - defer wg.Done() - dataEncoder := NewCodecMsgpackMarshaler() - db, err := leveldb.OpenFile(filepath.Join(path, pref+".cache"), nil) - if err != nil { - log.Fatal(err) - } - defer db.Close() - - // destinations are reverse mapped - if pref == utils.DESTINATION_PREFIX { - ks = make([]string, len(cs[utils.DESTINATION_PREFIX])) - i := 0 - for dk := range cs[utils.DESTINATION_PREFIX] { - ks[i] = dk - i++ - } - } - for _, k := range ks { - v := data[k] - if encData, err := dataEncoder.Marshal(v); err == nil { - if len(encData) > 1000 { - var buf bytes.Buffer - w := zlib.NewWriter(&buf) - w.Write(encData) - w.Close() - encData = buf.Bytes() - } - db.Put([]byte(k), encData, nil) - } else { - utils.Logger.Info(":" + err.Error()) - break - } - } - }(prefix, keys, mapValue) - } - wg.Wait() - - utils.Logger.Info(fmt.Sprintf("Cache %v save time: %v", prefixSlice, time.Since(start))) - return utils.SaveCacheFileInfo(path, cfi) -} - func (cs cacheDoubleStore) Load(path string, prefixes []string) error { if path == "" || len(prefixes) == 0 { return nil @@ -199,53 +138,15 @@ func (cs cacheDoubleStore) Load(path string, prefixes []string) error { continue } wg.Add(1) - go func(dirPath, key string) { + go func(dirPath, pref string) { defer wg.Done() - db, err := leveldb.OpenFile(dirPath, nil) + val, err := dumper.load(pref) if err != nil { - utils.Logger.Info(": " + err.Error()) + utils.Logger.Info(" load error: " + err.Error()) return } - defer db.Close() - dataDecoder := NewCodecMsgpackMarshaler() - val := make(map[string]interface{}) - iter := db.NewIterator(nil, nil) - for iter.Next() { - // Remember that the contents of the returned slice should not be modified, and - // only valid until the next call to Next. - k := iter.Key() - data := iter.Value() - var encData []byte - if data[0] == 120 && data[1] == 156 { //zip header - x := bytes.NewBuffer(data) - r, err := zlib.NewReader(x) - if err != nil { - //log.Printf("%s err3", key) - utils.Logger.Info(": " + err.Error()) - break - } - out, err := ioutil.ReadAll(r) - if err != nil { - //log.Printf("%s err4", key) - utils.Logger.Info(": " + err.Error()) - break - } - r.Close() - encData = out - } else { - encData = data - } - v := CacheTypeFactory(key) - if err := dataDecoder.Unmarshal(encData, &v); err != nil { - //log.Printf("%s err5", key) - utils.Logger.Info(": " + err.Error()) - break - } - val[string(k)] = v - } - iter.Release() mux.Lock() - cs[key] = val + cs[pref] = val mux.Unlock() }(p, prefix) } @@ -379,11 +280,6 @@ func (cs cacheSimpleStore) GetKeysForPrefix(prefix string) (keys []string) { return } -func (cs cacheSimpleStore) Save(path string, keys map[string][]string, cfi *utils.CacheFileInfo) error { - utils.Logger.Info("simplestore save") - return nil -} - func (cs cacheSimpleStore) Load(path string, keys []string) error { utils.Logger.Info("simplestore load") return nil diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index a708fe965..4ad34d66f 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -287,6 +287,11 @@ func NewMongoStorage(host, port, db, user, pass string, cdrsIndexes []string, ca if err = ndb.C(utils.TBLSMCosts).EnsureIndex(index); err != nil { return nil, err } + if cacheDumpDir != "" { + if err := CacheSetDumperPath(cacheDumpDir); err != nil { + utils.Logger.Info(" init error: " + err.Error()) + } + } return &MongoStorage{db: db, session: session, ms: NewCodecMsgpackMarshaler(), cacheDumpDir: cacheDumpDir, loadHistorySize: loadHistorySize}, err } @@ -665,32 +670,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac utils.Logger.Info(fmt.Sprintf("error saving load history: %v (%v)", loadHist, err)) return err } - keys := make(map[string][]string) - if len(dKeys) > 0 { - keys[utils.DESTINATION_PREFIX] = dKeys - } - if len(rpKeys) > 0 { - keys[utils.RATING_PLAN_PREFIX] = rpKeys - } - if len(rpfKeys) > 0 { - keys[utils.RATING_PROFILE_PREFIX] = rpfKeys - } - if len(lcrKeys) > 0 { - keys[utils.LCR_PREFIX] = lcrKeys - } - if len(actKeys) > 0 { - keys[utils.ACTION_PREFIX] = actKeys - } - if len(dcsKeys) > 0 { - keys[utils.DERIVEDCHARGERS_PREFIX] = dcsKeys - } - if len(aplKeys) > 0 { - keys[utils.ACTION_PLAN_PREFIX] = aplKeys - } - if len(shgKeys) > 0 { - keys[utils.SHARED_GROUP_PREFIX] = shgKeys - } - return CacheSave(ms.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist}) + return utils.SaveCacheFileInfo(ms.cacheDumpDir, &utils.CacheFileInfo{Encoding: utils.MSGPACK, LoadInfo: loadHist}) } func (ms *MongoStorage) CacheAccountingAll() error { @@ -794,7 +774,7 @@ func (ms *MongoStorage) cacheAccounting(alsKeys []string) (err error) { utils.Logger.Info(fmt.Sprintf("error saving load history: %v (%v)", loadHist, err)) return err } - return CacheSave(ms.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist}) + return utils.SaveCacheFileInfo(ms.cacheDumpDir, &utils.CacheFileInfo{Encoding: utils.MSGPACK, LoadInfo: loadHist}) } func (ms *MongoStorage) HasData(category, subject string) (bool, error) { diff --git a/engine/storage_redis.go b/engine/storage_redis.go index a8a85bc81..da66ff002 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -74,6 +74,11 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns i } else { return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr) } + if cacheDumpDir != "" { + if err := CacheSetDumperPath(cacheDumpDir); err != nil { + utils.Logger.Info(" init error: " + err.Error()) + } + } return &RedisStorage{db: p, ms: mrshler, cacheDumpDir: cacheDumpDir, loadHistorySize: loadHistorySize}, nil } @@ -340,32 +345,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac return err } - keys := make(map[string][]string) - if len(dKeys) > 0 { - keys[utils.DESTINATION_PREFIX] = dKeys - } - if len(rpKeys) > 0 { - keys[utils.RATING_PLAN_PREFIX] = rpKeys - } - if len(rpfKeys) > 0 { - keys[utils.RATING_PROFILE_PREFIX] = rpfKeys - } - if len(lcrKeys) > 0 { - keys[utils.LCR_PREFIX] = lcrKeys - } - if len(actKeys) > 0 { - keys[utils.ACTION_PREFIX] = actKeys - } - if len(dcsKeys) > 0 { - keys[utils.DERIVEDCHARGERS_PREFIX] = dcsKeys - } - if len(aplKeys) > 0 { - keys[utils.ACTION_PLAN_PREFIX] = aplKeys - } - if len(shgKeys) > 0 { - keys[utils.SHARED_GROUP_PREFIX] = shgKeys - } - return CacheSave(rs.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist}) + return utils.SaveCacheFileInfo(rs.cacheDumpDir, &utils.CacheFileInfo{Encoding: utils.MSGPACK, LoadInfo: loadHist}) } func (rs *RedisStorage) CacheAccountingAll() error { @@ -467,7 +447,7 @@ func (rs *RedisStorage) cacheAccounting(alsKeys []string) (err error) { utils.Logger.Info(fmt.Sprintf("error saving load history: %v (%v)", loadHist, err)) return err } - return CacheSave(rs.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist}) + return utils.SaveCacheFileInfo(rs.cacheDumpDir, &utils.CacheFileInfo{Encoding: utils.MSGPACK, LoadInfo: loadHist}) } // Used to check if specific subject is stored using prefix key attached to entity