From 58e9acae7425f8cf6837bbbe1c56ef0b606bde8c Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 30 Jun 2016 18:07:40 +0300 Subject: [PATCH] manual tests ok --- apier/v1/apier.go | 18 +++++++++--------- apier/v2/apier.go | 8 ++++---- cmd/cgr-engine/cgr-engine.go | 17 +++-------------- cmd/cgr-engine/rater.go | 16 ---------------- cmd/cgr-loader/cgr-loader.go | 10 +++++----- cmd/cgr-tester/cgr-tester.go | 8 ++++---- engine/cache.go | 4 ++++ engine/cache_store.go | 7 +++---- engine/calldesc.go | 8 ++++---- engine/calldesc_test.go | 30 ++++++++++++++++++++++++++++++ engine/libtest.go | 8 ++++---- engine/loader_csv_test.go | 8 +++++--- engine/loader_local_test.go | 18 +++++++++--------- engine/rateinterval_test.go | 18 ++++++++++++++++-- engine/storage_mongo_datadb.go | 26 +++++++++++++++++++------- engine/storage_redis.go | 24 +++++++++++++++++++----- engine/storage_redis_local_test.go | 2 +- engine/storage_test.go | 2 +- engine/storage_utils.go | 18 +++++++++--------- engine/tp_reader.go | 21 +-------------------- 20 files changed, 150 insertions(+), 121 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 4b74fd03f..4b56f3da2 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -137,7 +137,7 @@ func (self *ApierV1) LoadDestination(attrs AttrLoadDestination, reply *string) e if len(attrs.TPid) == 0 { return utils.NewErrMandatoryIeMissing("TPid") } - dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone, self.Config.LoadHistorySize) + dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone) if loaded, err := dbReader.LoadDestinationsFiltered(attrs.DestinationId); err != nil { return utils.NewErrServerError(err) } else if !loaded { @@ -161,7 +161,7 @@ func (self *ApierV1) LoadDerivedChargers(attrs utils.TPDerivedChargers, reply *s if len(attrs.TPid) == 0 { return utils.NewErrMandatoryIeMissing("TPid") } - dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone, self.Config.LoadHistorySize) + dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone) dc := engine.APItoModelDerivedCharger(&attrs) if err := dbReader.LoadDerivedChargersFiltered(&dc[0], true); err != nil { return utils.NewErrServerError(err) @@ -188,7 +188,7 @@ func (self *ApierV1) LoadRatingPlan(attrs AttrLoadRatingPlan, reply *string) err if len(attrs.TPid) == 0 { return utils.NewErrMandatoryIeMissing("TPid") } - dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone, self.Config.LoadHistorySize) + dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone) if loaded, err := dbReader.LoadRatingPlansFiltered(attrs.RatingPlanId); err != nil { return utils.NewErrServerError(err) } else if !loaded { @@ -218,7 +218,7 @@ func (self *ApierV1) LoadRatingProfile(attrs utils.TPRatingProfile, reply *strin if len(attrs.TPid) == 0 { return utils.NewErrMandatoryIeMissing("TPid") } - dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone, self.Config.LoadHistorySize) + dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone) rp := engine.APItoModelRatingProfile(&attrs) if err := dbReader.LoadRatingProfilesFiltered(&rp[0]); err != nil { return utils.NewErrServerError(err) @@ -245,7 +245,7 @@ func (self *ApierV1) LoadSharedGroup(attrs AttrLoadSharedGroup, reply *string) e if len(attrs.TPid) == 0 { return utils.NewErrMandatoryIeMissing("TPid") } - dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone, self.Config.LoadHistorySize) + dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone) if err := dbReader.LoadSharedGroupsFiltered(attrs.SharedGroupId, true); err != nil { return utils.NewErrServerError(err) } @@ -271,7 +271,7 @@ func (self *ApierV1) LoadCdrStats(attrs AttrLoadCdrStats, reply *string) error { if len(attrs.TPid) == 0 { return utils.NewErrMandatoryIeMissing("TPid") } - dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone, self.Config.LoadHistorySize) + dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone) if err := dbReader.LoadCdrStatsFiltered(attrs.CdrStatsId, true); err != nil { return utils.NewErrServerError(err) } @@ -291,7 +291,7 @@ func (self *ApierV1) LoadTariffPlanFromStorDb(attrs AttrLoadTpFromStorDb, reply if len(attrs.TPid) == 0 { return utils.NewErrMandatoryIeMissing("TPid") } - dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone, self.Config.LoadHistorySize) + dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone) if err := dbReader.LoadAll(); err != nil { return utils.NewErrServerError(err) } @@ -700,7 +700,7 @@ func (self *ApierV1) LoadAccountActions(attrs utils.TPAccountActions, reply *str if len(attrs.TPid) == 0 { return utils.NewErrMandatoryIeMissing("TPid") } - dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone, self.Config.LoadHistorySize) + dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone) if _, err := engine.Guardian.Guard(func() (interface{}, error) { aas := engine.APItoModelAccountAction(&attrs) if err := dbReader.LoadAccountActionsFiltered(aas); err != nil { @@ -880,7 +880,7 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, path.Join(attrs.FolderPath, utils.CDR_STATS_CSV), path.Join(attrs.FolderPath, utils.USERS_CSV), path.Join(attrs.FolderPath, utils.ALIASES_CSV), - ), "", self.Config.DefaultTimezone, self.Config.LoadHistorySize) + ), "", self.Config.DefaultTimezone) if err := loader.LoadAll(); err != nil { return utils.NewErrServerError(err) } diff --git a/apier/v2/apier.go b/apier/v2/apier.go index 8997a3aaf..b9be1e9ba 100644 --- a/apier/v2/apier.go +++ b/apier/v2/apier.go @@ -48,7 +48,7 @@ func (self *ApierV2) LoadRatingProfile(attrs AttrLoadRatingProfile, reply *strin tpRpf := &utils.TPRatingProfile{TPid: attrs.TPid} tpRpf.SetRatingProfilesId(attrs.RatingProfileId) rpf := engine.APItoModelRatingProfile(tpRpf) - dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone, self.Config.LoadHistorySize) + dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone) if err := dbReader.LoadRatingProfilesFiltered(&rpf[0]); err != nil { return utils.NewErrServerError(err) } @@ -74,7 +74,7 @@ func (self *ApierV2) LoadAccountActions(attrs AttrLoadAccountActions, reply *str if len(attrs.TPid) == 0 { return utils.NewErrMandatoryIeMissing("TPid") } - dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone, self.Config.LoadHistorySize) + dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone) tpAa := &utils.TPAccountActions{TPid: attrs.TPid} tpAa.SetAccountActionsId(attrs.AccountActionsId) aa := engine.APItoModelAccountAction(tpAa) @@ -111,7 +111,7 @@ func (self *ApierV2) LoadDerivedChargers(attrs AttrLoadDerivedChargers, reply *s tpDc := &utils.TPDerivedChargers{TPid: attrs.TPid} tpDc.SetDerivedChargersId(attrs.DerivedChargersId) dc := engine.APItoModelDerivedCharger(tpDc) - dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone, self.Config.LoadHistorySize) + dbReader := engine.NewTpReader(self.RatingDb, self.AccountDb, self.StorDb, attrs.TPid, self.Config.DefaultTimezone) if err := dbReader.LoadDerivedChargersFiltered(&dc[0], true); err != nil { return utils.NewErrServerError(err) } @@ -156,7 +156,7 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, path.Join(attrs.FolderPath, utils.CDR_STATS_CSV), path.Join(attrs.FolderPath, utils.USERS_CSV), path.Join(attrs.FolderPath, utils.ALIASES_CSV), - ), "", self.Config.DefaultTimezone, self.Config.LoadHistorySize) + ), "", self.Config.DefaultTimezone) if err := loader.LoadAll(); err != nil { return utils.NewErrServerError(err) } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 4cf0bc056..008b81c16 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -22,7 +22,6 @@ import ( "flag" "fmt" "log" - "path/filepath" // _ "net/http/pprof" "os" "runtime" @@ -402,29 +401,19 @@ func startAliasesServer(internalAliaseSChan chan rpcclient.RpcClientConnection, internalAliaseSChan <- aliasesServer return } - - start := time.Now() cfi, err := utils.LoadCacheFileInfo(cfg.CacheDumpDir) - if err != nil || cfi.LoadInfo.AccountingLoadID != loadHist[0].AccountingLoadID || !utils.CacheFileExists(filepath.Join(cfg.CacheDumpDir, utils.ALIASES_PREFIX+".cache")) { + if err != nil || cfi.LoadInfo.AccountingLoadID != loadHist[0].AccountingLoadID { if err := accountDb.CacheAccountingPrefixes(utils.ALIASES_PREFIX); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) exitChan <- true return } - utils.Logger.Info(fmt.Sprintf("Cache accounting creation time: %v", time.Since(start))) - - start = time.Now() - if err := engine.CacheSave(cfg.CacheDumpDir, []string{utils.ALIASES_PREFIX}, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist[0]}); err != nil { - utils.Logger.Emerg(fmt.Sprintf("could not save cache file: " + err.Error())) - } - utils.Logger.Info(fmt.Sprintf("Cache accounting save time: %v", time.Since(start))) } else { if err := engine.CacheLoad(cfg.CacheDumpDir, []string{utils.ALIASES_PREFIX}); err != nil { utils.Logger.Crit("could not load cache file: " + err.Error()) exitChan <- true return } - utils.Logger.Info(fmt.Sprintf("Cache accounting load time: %v", time.Since(start))) } internalAliaseSChan <- aliasesServer } @@ -530,7 +519,7 @@ func main() { var cdrDb engine.CdrStorage if cfg.RALsEnabled || cfg.SchedulerEnabled || cfg.CDRStatsEnabled { // Only connect to dataDb if necessary ratingDb, err = engine.ConfigureRatingStorage(cfg.TpDbType, cfg.TpDbHost, cfg.TpDbPort, - cfg.TpDbName, cfg.TpDbUser, cfg.TpDbPass, cfg.DBDataEncoding, cfg.CacheDumpDir) + cfg.TpDbName, cfg.TpDbUser, cfg.TpDbPass, cfg.DBDataEncoding, cfg.CacheDumpDir, cfg.LoadHistorySize) if err != nil { // Cannot configure getter database, show stopper utils.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err)) return @@ -540,7 +529,7 @@ func main() { } if cfg.RALsEnabled || cfg.CDRStatsEnabled || cfg.PubSubServerEnabled || cfg.AliasesServerEnabled || cfg.UserServerEnabled { accountDb, err = engine.ConfigureAccountingStorage(cfg.DataDbType, cfg.DataDbHost, cfg.DataDbPort, - cfg.DataDbName, cfg.DataDbUser, cfg.DataDbPass, cfg.DBDataEncoding, cfg.CacheDumpDir) + cfg.DataDbName, cfg.DataDbUser, cfg.DataDbPass, cfg.DBDataEncoding, cfg.CacheDumpDir, cfg.LoadHistorySize) if err != nil { // Cannot configure getter database, show stopper utils.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err)) return diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index f23af4699..ff2d67265 100644 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -71,8 +71,6 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC cacheDoneChan <- struct{}{} return } - - start := time.Now() cfi, err := utils.LoadCacheFileInfo(cfg.CacheDumpDir) if err != nil || cfi.LoadInfo.RatingLoadID != loadHist[0].RatingLoadID { if err := ratingDb.CacheRatingAll(); err != nil { @@ -80,26 +78,12 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC exitChan <- true return } - /*if err := accountDb.CacheAccountingPrefixes(); err != nil { // Used to cache load history - utils.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error())) - exitChan <- true - return - }*/ - - utils.Logger.Info(fmt.Sprintf("Cache rating creation time: %v", time.Since(start))) - - start = time.Now() - if err := engine.CacheSave(cfg.CacheDumpDir, []string{utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.LCR_PREFIX, utils.DERIVEDCHARGERS_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.SHARED_GROUP_PREFIX}, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist[0]}); err != nil { - utils.Logger.Emerg(fmt.Sprintf("could not save cache file: " + err.Error())) - } - utils.Logger.Info(fmt.Sprintf("Cache rating save time: %v", time.Since(start))) } else { if err := engine.CacheLoad(cfg.CacheDumpDir, []string{utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.LCR_PREFIX, utils.DERIVEDCHARGERS_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.SHARED_GROUP_PREFIX}); err != nil { utils.Logger.Crit("could not load cache file: " + err.Error()) exitChan <- true return } - utils.Logger.Info(fmt.Sprintf("Cache rating load time: %v", time.Since(start))) } cacheDoneChan <- struct{}{} }() diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 22f7af185..8ef7f8fbf 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -202,8 +202,8 @@ func main() { if !*dryRun { // make sure we do not need db connections on dry run, also not importing into any stordb if *fromStorDb { ratingDb, errRatingDb = engine.ConfigureRatingStorage(*tpdb_type, *tpdb_host, *tpdb_port, *tpdb_name, - *tpdb_user, *tpdb_pass, *dbdata_encoding, *cacheDumpDir) - accountDb, errAccDb = engine.ConfigureAccountingStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, *cacheDumpDir) + *tpdb_user, *tpdb_pass, *dbdata_encoding, *cacheDumpDir, *loadHistorySize) + accountDb, errAccDb = engine.ConfigureAccountingStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, *cacheDumpDir, *loadHistorySize) storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding, cgrConfig.StorDBMaxOpenConns, cgrConfig.StorDBMaxIdleConns, cgrConfig.StorDBCDRSIndexes) } else if *toStorDb { // Import from csv files to storDb @@ -211,8 +211,8 @@ func main() { cgrConfig.StorDBMaxOpenConns, cgrConfig.StorDBMaxIdleConns, cgrConfig.StorDBCDRSIndexes) } else { // Default load from csv files to dataDb ratingDb, errRatingDb = engine.ConfigureRatingStorage(*tpdb_type, *tpdb_host, *tpdb_port, *tpdb_name, - *tpdb_user, *tpdb_pass, *dbdata_encoding, *cacheDumpDir) - accountDb, errAccDb = engine.ConfigureAccountingStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, *cacheDumpDir) + *tpdb_user, *tpdb_pass, *dbdata_encoding, *cacheDumpDir, *loadHistorySize) + accountDb, errAccDb = engine.ConfigureAccountingStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, *cacheDumpDir, *loadHistorySize) } // Defer databases opened to be closed when we are done for _, db := range []engine.Storage{ratingDb, accountDb, storDb} { @@ -272,7 +272,7 @@ func main() { path.Join(*dataPath, utils.ALIASES_CSV), ) } - tpReader := engine.NewTpReader(ratingDb, accountDb, loader, *tpid, *timezone, *loadHistorySize) + tpReader := engine.NewTpReader(ratingDb, accountDb, loader, *tpid, *timezone) err = tpReader.LoadAll() if err != nil { log.Fatal(err) diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go index 3e4079032..a0df865d9 100644 --- a/cmd/cgr-tester/cgr-tester.go +++ b/cmd/cgr-tester/cgr-tester.go @@ -61,18 +61,18 @@ var ( destination = flag.String("destination", "1002", "The destination to use in queries.") json = flag.Bool("json", false, "Use JSON RPC") cacheDumpDir = flag.String("cache_dump_dir", cgrConfig.CacheDumpDir, "Folder to store cache dump for fast reload") - - nilDuration = time.Duration(0) + loadHistorySize = flag.Int("load_history_size", cgrConfig.LoadHistorySize, "Limit the number of records in the load history") + nilDuration = time.Duration(0) ) func durInternalRater(cd *engine.CallDescriptor) (time.Duration, error) { - ratingDb, err := engine.ConfigureRatingStorage(*ratingdb_type, *ratingdb_host, *ratingdb_port, *ratingdb_name, *ratingdb_user, *ratingdb_pass, *dbdata_encoding, *cacheDumpDir) + ratingDb, err := engine.ConfigureRatingStorage(*ratingdb_type, *ratingdb_host, *ratingdb_port, *ratingdb_name, *ratingdb_user, *ratingdb_pass, *dbdata_encoding, *cacheDumpDir, *loadHistorySize) if err != nil { return nilDuration, fmt.Errorf("Could not connect to rating database: %s", err.Error()) } defer ratingDb.Close() engine.SetRatingStorage(ratingDb) - accountDb, err := engine.ConfigureAccountingStorage(*accountdb_type, *accountdb_host, *accountdb_port, *accountdb_name, *accountdb_user, *accountdb_pass, *dbdata_encoding, *cacheDumpDir) + accountDb, err := engine.ConfigureAccountingStorage(*accountdb_type, *accountdb_host, *accountdb_port, *accountdb_name, *accountdb_user, *accountdb_pass, *dbdata_encoding, *cacheDumpDir, *loadHistorySize) if err != nil { return nilDuration, fmt.Errorf("Could not connect to accounting database: %s", err.Error()) } diff --git a/engine/cache.go b/engine/cache.go index 3c0423b20..ac171ddf2 100644 --- a/engine/cache.go +++ b/engine/cache.go @@ -2,7 +2,9 @@ package engine import ( + "fmt" "sync" + "time" "github.com/cgrates/cgrates/utils" ) @@ -85,6 +87,7 @@ func CacheSave(path string, keys []string, cfi *utils.CacheFileInfo) error { } func CacheLoad(path string, keys []string) error { + start := time.Now() if !transactionLock { mux.Lock() defer mux.Unlock() @@ -92,6 +95,7 @@ func CacheLoad(path string, keys []string) error { if !transactionON { return cache.Load(path, keys) } + utils.Logger.Info(fmt.Sprintf("Cache rating load time: %v", time.Since(start))) return nil } diff --git a/engine/cache_store.go b/engine/cache_store.go index cea51ab13..d1246ec35 100644 --- a/engine/cache_store.go +++ b/engine/cache_store.go @@ -7,7 +7,6 @@ import ( "compress/zlib" "fmt" "io/ioutil" - "log" "os" "path/filepath" "strconv" @@ -121,11 +120,11 @@ func (cs cacheDoubleStore) GetKeysForPrefix(prefix string) (keys []string) { } func (cs cacheDoubleStore) Save(path string, prefixes []string, cfi *utils.CacheFileInfo) error { - log.Printf("path: %s prefixes: %v", path, prefixes) + //log.Printf("path: %s prefixes: %v", path, prefixes) if path == "" || len(prefixes) == 0 { return nil } - log.Print("saving cache prefixes: ", prefixes) + //log.Print("saving cache prefixes: ", prefixes) // create a the path if err := os.MkdirAll(path, 0766); err != nil { utils.Logger.Info(":" + err.Error()) @@ -243,7 +242,7 @@ func (cs cacheDoubleStore) Load(path string, prefixes []string) error { } kv := CacheTypeFactory(key, "", nil) if err := dataDecoder.Unmarshal(encData, &kv); err != nil { - log.Printf("%s err5", key) + //log.Printf("%s err5", key) utils.Logger.Info(": " + err.Error()) break } diff --git a/engine/calldesc.go b/engine/calldesc.go index 78c850b43..9b8f43d19 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -46,20 +46,20 @@ func init() { ratingStorage, _ = NewMapStorage() accountingStorage, _ = NewMapStorage() case "mongo": - ratingStorage, err = NewMongoStorage("127.0.0.1", "27017", "cgrates_rating_test", "", "", nil, "") + ratingStorage, err = NewMongoStorage("127.0.0.1", "27017", "cgrates_rating_test", "", "", nil, "", 10) if err != nil { log.Fatal(err) } - accountingStorage, err = NewMongoStorage("127.0.0.1", "27017", "cgrates_accounting_test", "", "", nil, "") + accountingStorage, err = NewMongoStorage("127.0.0.1", "27017", "cgrates_accounting_test", "", "", nil, "", 10) if err != nil { log.Fatal(err) } case "redis": - ratingStorage, _ = NewRedisStorage("127.0.0.1:6379", 12, "", utils.MSGPACK, utils.REDIS_MAX_CONNS, "") + ratingStorage, _ = NewRedisStorage("127.0.0.1:6379", 12, "", utils.MSGPACK, utils.REDIS_MAX_CONNS, "", 10) if err != nil { log.Fatal(err) } - accountingStorage, _ = NewRedisStorage("127.0.0.1:6379", 13, "", utils.MSGPACK, utils.REDIS_MAX_CONNS, "") + accountingStorage, _ = NewRedisStorage("127.0.0.1:6379", 13, "", utils.MSGPACK, utils.REDIS_MAX_CONNS, "", 10) if err != nil { log.Fatal(err) } diff --git a/engine/calldesc_test.go b/engine/calldesc_test.go index f2dbbdd08..8ecf80774 100644 --- a/engine/calldesc_test.go +++ b/engine/calldesc_test.go @@ -272,6 +272,36 @@ func TestGetCost(t *testing.T) { } } +func TestGetCostRounding(t *testing.T) { + t1 := time.Date(2017, time.February, 2, 17, 30, 0, 0, time.UTC) + t2 := time.Date(2017, time.February, 2, 17, 33, 0, 0, time.UTC) + cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Subject: "round", Destination: "49", TimeStart: t1, TimeEnd: t2, LoopIndex: 0} + result, _ := cd.GetCost() + if result.Cost != 0.3001 || result.GetConnectFee() != 0 { // should be 0.3 :( + t.Error("bad cost", utils.ToIJSON(result)) + } +} + +func TestDebitRounding(t *testing.T) { + t1 := time.Date(2017, time.February, 2, 17, 30, 0, 0, time.UTC) + t2 := time.Date(2017, time.February, 2, 17, 33, 0, 0, time.UTC) + cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Subject: "round", Destination: "49", TimeStart: t1, TimeEnd: t2, LoopIndex: 0} + result, _ := cd.Debit() + if result.Cost != 0.30006 || result.GetConnectFee() != 0 { // should be 0.3 :( + t.Error("bad cost", utils.ToIJSON(result)) + } +} + +func TestDebitPerformRounding(t *testing.T) { + t1 := time.Date(2017, time.February, 2, 17, 30, 0, 0, time.UTC) + t2 := time.Date(2017, time.February, 2, 17, 33, 0, 0, time.UTC) + cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Subject: "round", Destination: "49", TimeStart: t1, TimeEnd: t2, LoopIndex: 0, PerformRounding: true} + result, _ := cd.Debit() + if result.Cost != 0.3001 || result.GetConnectFee() != 0 { // should be 0.3 :( + t.Error("bad cost", utils.ToIJSON(result)) + } +} + func TestGetCostZero(t *testing.T) { t1 := time.Date(2012, time.February, 2, 17, 30, 0, 0, time.UTC) t2 := time.Date(2012, time.February, 2, 17, 30, 0, 0, time.UTC) diff --git a/engine/libtest.go b/engine/libtest.go index 18e3e1174..1df886bdf 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -33,12 +33,12 @@ import ( ) func InitDataDb(cfg *config.CGRConfig) error { - ratingDb, err := ConfigureRatingStorage(cfg.TpDbType, cfg.TpDbHost, cfg.TpDbPort, cfg.TpDbName, cfg.TpDbUser, cfg.TpDbPass, cfg.DBDataEncoding, "") + ratingDb, err := ConfigureRatingStorage(cfg.TpDbType, cfg.TpDbHost, cfg.TpDbPort, cfg.TpDbName, cfg.TpDbUser, cfg.TpDbPass, cfg.DBDataEncoding, cfg.CacheDumpDir, cfg.LoadHistorySize) if err != nil { return err } accountDb, err := ConfigureAccountingStorage(cfg.DataDbType, cfg.DataDbHost, cfg.DataDbPort, cfg.DataDbName, - cfg.DataDbUser, cfg.DataDbPass, cfg.DBDataEncoding, "") + cfg.DataDbUser, cfg.DataDbPass, cfg.DBDataEncoding, cfg.CacheDumpDir, cfg.LoadHistorySize) if err != nil { return err } @@ -91,7 +91,7 @@ func StopStartEngine(cfgPath string, waitEngine int) (*exec.Cmd, error) { return StartEngine(cfgPath, waitEngine) } -func LoadTariffPlanFromFolder(tpPath, timezone string, loadHistSize int, ratingDb RatingStorage, accountingDb AccountingStorage) error { +func LoadTariffPlanFromFolder(tpPath, timezone string, ratingDb RatingStorage, accountingDb AccountingStorage) error { loader := NewTpReader(ratingDb, accountingDb, NewFileCSVStorage(utils.CSV_SEP, path.Join(tpPath, utils.DESTINATIONS_CSV), path.Join(tpPath, utils.TIMINGS_CSV), @@ -110,7 +110,7 @@ func LoadTariffPlanFromFolder(tpPath, timezone string, loadHistSize int, ratingD path.Join(tpPath, utils.USERS_CSV), path.Join(tpPath, utils.ALIASES_CSV), - ), "", timezone, loadHistSize) + ), "", timezone) if err := loader.LoadAll(); err != nil { return utils.NewErrServerError(err) } diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index c26038562..613679e4d 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -150,6 +150,7 @@ DY_PLAN,RT_DY,*any,10 *out,cgrates.org,call,money,2015-02-28T00:00:00Z,EVENING,, *out,cgrates.org,call,dy,2015-02-28T00:00:00Z,DY_PLAN,, *out,cgrates.org,call,block,2015-02-28T00:00:00Z,DY_PLAN,, +*out,cgrates.org,call,round,2016-06-30T00:00:00Z,DEFAULT,, ` sharedGroups = ` SG1,*any,*lowest, @@ -226,6 +227,7 @@ cgrates.org,block_empty,BLOCK_EMPTY_AT,,false,false cgrates.org,expo,EXP_AT,,false,false cgrates.org,expnoexp,,,false,false cgrates.org,vf,,,false,false +cgrates.org,round,TOPUP10_AT,,false,false ` derivedCharges = ` @@ -268,7 +270,7 @@ var csvr *TpReader func init() { csvr = NewTpReader(ratingStorage, accountingStorage, NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, - sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases), "", "", 10) + sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases), "", "") if err := csvr.LoadDestinations(); err != nil { log.Print("error in LoadDestinations:", err) } @@ -809,7 +811,7 @@ func TestLoadRatingPlans(t *testing.T) { } func TestLoadRatingProfiles(t *testing.T) { - if len(csvr.ratingProfiles) != 23 { + if len(csvr.ratingProfiles) != 24 { t.Error("Failed to load rating profiles: ", len(csvr.ratingProfiles), csvr.ratingProfiles) } rp := csvr.ratingProfiles["*out:test:0:trp"] @@ -1109,7 +1111,7 @@ func TestLoadActionTriggers(t *testing.T) { } func TestLoadAccountActions(t *testing.T) { - if len(csvr.accountActions) != 16 { + if len(csvr.accountActions) != 17 { t.Error("Failed to load account actions: ", len(csvr.accountActions)) } aa := csvr.accountActions["vdf:minitsboy"] diff --git a/engine/loader_local_test.go b/engine/loader_local_test.go index 2fe0e687b..dcb334004 100644 --- a/engine/loader_local_test.go +++ b/engine/loader_local_test.go @@ -57,25 +57,25 @@ func TestConnDataDbs(t *testing.T) { } lCfg, _ = config.NewDefaultCGRConfig() var err error - if ratingDbCsv, err = ConfigureRatingStorage(lCfg.TpDbType, lCfg.TpDbHost, lCfg.TpDbPort, "4", lCfg.TpDbUser, lCfg.TpDbPass, lCfg.DBDataEncoding, ""); err != nil { + if ratingDbCsv, err = ConfigureRatingStorage(lCfg.TpDbType, lCfg.TpDbHost, lCfg.TpDbPort, "4", lCfg.TpDbUser, lCfg.TpDbPass, lCfg.DBDataEncoding, "", 1); err != nil { t.Fatal("Error on ratingDb connection: ", err.Error()) } - if ratingDbStor, err = ConfigureRatingStorage(lCfg.TpDbType, lCfg.TpDbHost, lCfg.TpDbPort, "5", lCfg.TpDbUser, lCfg.TpDbPass, lCfg.DBDataEncoding, ""); err != nil { + if ratingDbStor, err = ConfigureRatingStorage(lCfg.TpDbType, lCfg.TpDbHost, lCfg.TpDbPort, "5", lCfg.TpDbUser, lCfg.TpDbPass, lCfg.DBDataEncoding, "", 1); err != nil { t.Fatal("Error on ratingDb connection: ", err.Error()) } - if ratingDbApier, err = ConfigureRatingStorage(lCfg.TpDbType, lCfg.TpDbHost, lCfg.TpDbPort, "6", lCfg.TpDbUser, lCfg.TpDbPass, lCfg.DBDataEncoding, ""); err != nil { + if ratingDbApier, err = ConfigureRatingStorage(lCfg.TpDbType, lCfg.TpDbHost, lCfg.TpDbPort, "6", lCfg.TpDbUser, lCfg.TpDbPass, lCfg.DBDataEncoding, "", 1); err != nil { t.Fatal("Error on ratingDb connection: ", err.Error()) } if accountDbCsv, err = ConfigureAccountingStorage(lCfg.DataDbType, lCfg.DataDbHost, lCfg.DataDbPort, "7", - lCfg.DataDbUser, lCfg.DataDbPass, lCfg.DBDataEncoding, ""); err != nil { + lCfg.DataDbUser, lCfg.DataDbPass, lCfg.DBDataEncoding, "", 1); err != nil { t.Fatal("Error on ratingDb connection: ", err.Error()) } if accountDbStor, err = ConfigureAccountingStorage(lCfg.DataDbType, lCfg.DataDbHost, lCfg.DataDbPort, "8", - lCfg.DataDbUser, lCfg.DataDbPass, lCfg.DBDataEncoding, ""); err != nil { + lCfg.DataDbUser, lCfg.DataDbPass, lCfg.DBDataEncoding, "", 1); err != nil { t.Fatal("Error on ratingDb connection: ", err.Error()) } if accountDbApier, err = ConfigureAccountingStorage(lCfg.DataDbType, lCfg.DataDbHost, lCfg.DataDbPort, "9", - lCfg.DataDbUser, lCfg.DataDbPass, lCfg.DBDataEncoding, ""); err != nil { + lCfg.DataDbUser, lCfg.DataDbPass, lCfg.DBDataEncoding, "", 1); err != nil { t.Fatal("Error on ratingDb connection: ", err.Error()) } for _, db := range []Storage{ratingDbCsv, ratingDbStor, ratingDbApier, accountDbCsv, accountDbStor, accountDbApier} { @@ -133,7 +133,7 @@ func TestLoadFromCSV(t *testing.T) { path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.CDR_STATS_CSV), path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.USERS_CSV), path.Join(*dataDir, "tariffplans", *tpCsvScenario, utils.ALIASES_CSV), - ), "", "", lCfg.LoadHistorySize) + ), "", "") if err = loader.LoadDestinations(); err != nil { t.Error("Failed loading destinations: ", err.Error()) @@ -209,7 +209,7 @@ func TestLoadFromStorDb(t *testing.T) { if !*testLocal { return } - loader := NewTpReader(ratingDbStor, accountDbStor, storDb, utils.TEST_SQL, "", lCfg.LoadHistorySize) + loader := NewTpReader(ratingDbStor, accountDbStor, storDb, utils.TEST_SQL, "") if err := loader.LoadDestinations(); err != nil { t.Error("Failed loading destinations: ", err.Error()) } @@ -261,7 +261,7 @@ func TestLoadIndividualProfiles(t *testing.T) { if !*testLocal { return } - loader := NewTpReader(ratingDbApier, accountDbApier, storDb, utils.TEST_SQL, "", lCfg.LoadHistorySize) + loader := NewTpReader(ratingDbApier, accountDbApier, storDb, utils.TEST_SQL, "") // Load ratingPlans. This will also set destination keys if ratingPlans, err := storDb.GetTpRatingPlans(utils.TEST_SQL, "", nil); err != nil { t.Fatal("Could not retrieve rating plans") diff --git a/engine/rateinterval_test.go b/engine/rateinterval_test.go index 8938dd5a6..f52be43b7 100644 --- a/engine/rateinterval_test.go +++ b/engine/rateinterval_test.go @@ -349,8 +349,22 @@ func TestRateIntervalCronEmpty(t *testing.T) { } } -func TestTimingIsActive(t *testing.T) { - +func TestRateIntervalCost(t *testing.T) { + ri := &RateInterval{ + Rating: &RIRate{ + Rates: RateGroups{ + &Rate{ + Value: 0.1, + RateIncrement: time.Second, + RateUnit: 60 * time.Second, + }, + }, + }, + } + x := ri.GetCost(60*time.Second, 0) + if x != 0.1 { + t.Error("expected 0.1 was: ", x) + } } /*********************************Benchmarks**************************************/ diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 24180e629..7fb797359 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -85,10 +85,11 @@ var ( ) type MongoStorage struct { - session *mgo.Session - db string - ms Marshaler - cacheDumpDir string + session *mgo.Session + db string + ms Marshaler + cacheDumpDir string + loadHistorySize int } func (ms *MongoStorage) conn(col string) (*mgo.Session, *mgo.Collection) { @@ -96,7 +97,7 @@ func (ms *MongoStorage) conn(col string) (*mgo.Session, *mgo.Collection) { return sessionCopy, sessionCopy.DB(ms.db).C(col) } -func NewMongoStorage(host, port, db, user, pass string, cdrsIndexes []string, cacheDumpDir string) (*MongoStorage, error) { +func NewMongoStorage(host, port, db, user, pass string, cdrsIndexes []string, cacheDumpDir string, loadHistorySize int) (*MongoStorage, error) { // We need this object to establish a session to our MongoDB. /*address := fmt.Sprintf("%s:%s", host, port) @@ -286,7 +287,7 @@ func NewMongoStorage(host, port, db, user, pass string, cdrsIndexes []string, ca if err = ndb.C(utils.TBLSMCosts).EnsureIndex(index); err != nil { return nil, err } - return &MongoStorage{db: db, session: session, ms: NewCodecMsgpackMarshaler(), cacheDumpDir: cacheDumpDir}, err + return &MongoStorage{db: db, session: session, ms: NewCodecMsgpackMarshaler(), cacheDumpDir: cacheDumpDir, loadHistorySize: loadHistorySize}, err } func (ms *MongoStorage) Close() { @@ -419,6 +420,7 @@ func (ms *MongoStorage) CacheRatingPrefixValues(prefixes map[string][]string) er } func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, actKeys, aplKeys, shgKeys []string) (err error) { + start := time.Now() CacheBeginTransaction() keyResult := struct{ Key string }{} idResult := struct{ Id string }{} @@ -642,6 +644,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac utils.Logger.Info("Finished shared groups caching.") } CacheCommitTransaction() + utils.Logger.Info(fmt.Sprintf("Cache rating creation time: %v", time.Since(start))) loadHistList, err := ms.GetLoadHistory(1, true) if err != nil || len(loadHistList) == 0 { utils.Logger.Info(fmt.Sprintf("could not get load history: %v (%v)", loadHistList, err)) @@ -658,6 +661,10 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac loadHist.RatingLoadID = utils.GenUUID() loadHist.LoadTime = time.Now() } + if err := ms.AddLoadHistory(loadHist, ms.loadHistorySize); err != nil { + utils.Logger.Info(fmt.Sprintf("error saving load history: %v (%v)", loadHist, err)) + return err + } var keys []string if len(dKeys) > 0 { keys = append(keys, utils.DESTINATION_PREFIX) @@ -714,6 +721,7 @@ func (ms *MongoStorage) CacheAccountingPrefixValues(prefixes map[string][]string } func (ms *MongoStorage) cacheAccounting(alsKeys []string) (err error) { + start := time.Now() CacheBeginTransaction() var keyResult struct{ Key string } if alsKeys == nil { @@ -761,6 +769,7 @@ func (ms *MongoStorage) cacheAccounting(alsKeys []string) (err error) { } utils.Logger.Info("Finished load history caching.") CacheCommitTransaction() + utils.Logger.Info(fmt.Sprintf("Cache accounting creation time: %v", time.Since(start))) var keys []string if len(alsKeys) > 0 { keys = append(keys, utils.ALIASES_PREFIX) @@ -778,7 +787,10 @@ func (ms *MongoStorage) cacheAccounting(alsKeys []string) (err error) { loadHist.AccountingLoadID = utils.GenUUID() loadHist.LoadTime = time.Now() } - + if err := ms.AddLoadHistory(loadHist, ms.loadHistorySize); err != nil { //FIXME replace 100 with cfg + utils.Logger.Info(fmt.Sprintf("error saving load history: %v (%v)", loadHist, err)) + return err + } return CacheSave(ms.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist}) } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 0eab0b906..f8c4539b4 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -36,12 +36,13 @@ var ( ) type RedisStorage struct { - db *pool.Pool - ms Marshaler - cacheDumpDir string + db *pool.Pool + ms Marshaler + cacheDumpDir string + loadHistorySize int } -func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns int, cacheDumpDir string) (*RedisStorage, error) { +func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns int, cacheDumpDir string, loadHistorySize int) (*RedisStorage, error) { df := func(network, addr string) (*redis.Client, error) { client, err := redis.Dial(network, addr) if err != nil { @@ -73,7 +74,7 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns i } else { return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr) } - return &RedisStorage{db: p, ms: mrshler, cacheDumpDir: cacheDumpDir}, nil + return &RedisStorage{db: p, ms: mrshler, cacheDumpDir: cacheDumpDir, loadHistorySize: loadHistorySize}, nil } func (rs *RedisStorage) Close() { @@ -140,6 +141,7 @@ func (rs *RedisStorage) CacheRatingPrefixValues(prefixes map[string][]string) er } func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, actKeys, aplKeys, shgKeys []string) (err error) { + start := time.Now() CacheBeginTransaction() conn, err := rs.db.Get() if err != nil { @@ -316,6 +318,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac } CacheCommitTransaction() + utils.Logger.Info(fmt.Sprintf("Cache rating creation time: %v", time.Since(start))) loadHistList, err := rs.GetLoadHistory(1, true) if err != nil || len(loadHistList) == 0 { utils.Logger.Info(fmt.Sprintf("could not get load history: %v (%v)", loadHistList, err)) @@ -332,6 +335,11 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac loadHist.RatingLoadID = utils.GenUUID() loadHist.LoadTime = time.Now() } + if err := rs.AddLoadHistory(loadHist, rs.loadHistorySize); err != nil { + utils.Logger.Info(fmt.Sprintf("error saving load history: %v (%v)", loadHist, err)) + return err + } + var keys []string if len(dKeys) > 0 { keys = append(keys, utils.DESTINATION_PREFIX) @@ -388,6 +396,7 @@ func (rs *RedisStorage) CacheAccountingPrefixValues(prefixes map[string][]string } func (rs *RedisStorage) cacheAccounting(alsKeys []string) (err error) { + start := time.Now() CacheBeginTransaction() conn, err := rs.db.Get() if err != nil { @@ -430,6 +439,7 @@ func (rs *RedisStorage) cacheAccounting(alsKeys []string) (err error) { } utils.Logger.Info("Finished load history caching.") CacheCommitTransaction() + utils.Logger.Info(fmt.Sprintf("Cache accounting creation time: %v", time.Since(start))) var keys []string if len(alsKeys) > 0 { keys = append(keys, utils.ALIASES_PREFIX) @@ -450,6 +460,10 @@ func (rs *RedisStorage) cacheAccounting(alsKeys []string) (err error) { loadHist.AccountingLoadID = utils.GenUUID() loadHist.LoadTime = time.Now() } + if err := rs.AddLoadHistory(loadHist, rs.loadHistorySize); err != nil { + utils.Logger.Info(fmt.Sprintf("error saving load history: %v (%v)", loadHist, err)) + return err + } return CacheSave(rs.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist}) } diff --git a/engine/storage_redis_local_test.go b/engine/storage_redis_local_test.go index b9360afcb..094ae9b36 100644 --- a/engine/storage_redis_local_test.go +++ b/engine/storage_redis_local_test.go @@ -35,7 +35,7 @@ func TestConnectRedis(t *testing.T) { return } cfg, _ := config.NewDefaultCGRConfig() - rds, err = NewRedisStorage(fmt.Sprintf("%s:%s", cfg.TpDbHost, cfg.TpDbPort), 4, cfg.TpDbPass, cfg.DBDataEncoding, utils.REDIS_MAX_CONNS, "") + rds, err = NewRedisStorage(fmt.Sprintf("%s:%s", cfg.TpDbHost, cfg.TpDbPort), 4, cfg.TpDbPass, cfg.DBDataEncoding, utils.REDIS_MAX_CONNS, "", 1) if err != nil { t.Fatal("Could not connect to Redis", err.Error()) } diff --git a/engine/storage_test.go b/engine/storage_test.go index 03e6818b0..56753d8f9 100644 --- a/engine/storage_test.go +++ b/engine/storage_test.go @@ -273,7 +273,7 @@ func TestDifferentUuid(t *testing.T) { func TestStorageTask(t *testing.T) { // clean previous unused tasks - for i := 0; i < 20; i++ { + for i := 0; i < 21; i++ { ratingStorage.PopTask() } diff --git a/engine/storage_utils.go b/engine/storage_utils.go index 219d7e240..eda2205b0 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -28,7 +28,7 @@ import ( // Various helpers to deal with database -func ConfigureRatingStorage(db_type, host, port, name, user, pass, marshaler, cacheDumpDir string) (db RatingStorage, err error) { +func ConfigureRatingStorage(db_type, host, port, name, user, pass, marshaler, cacheDumpDir string, loadHistorySize int) (db RatingStorage, err error) { var d Storage switch db_type { case utils.REDIS: @@ -41,9 +41,9 @@ func ConfigureRatingStorage(db_type, host, port, name, user, pass, marshaler, ca if port != "" { host += ":" + port } - d, err = NewRedisStorage(host, db_nb, pass, marshaler, utils.REDIS_MAX_CONNS, cacheDumpDir) + d, err = NewRedisStorage(host, db_nb, pass, marshaler, utils.REDIS_MAX_CONNS, cacheDumpDir, loadHistorySize) case utils.MONGO: - d, err = NewMongoStorage(host, port, name, user, pass, nil, cacheDumpDir) + d, err = NewMongoStorage(host, port, name, user, pass, nil, cacheDumpDir, loadHistorySize) db = d.(RatingStorage) default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'", @@ -55,7 +55,7 @@ func ConfigureRatingStorage(db_type, host, port, name, user, pass, marshaler, ca return d.(RatingStorage), nil } -func ConfigureAccountingStorage(db_type, host, port, name, user, pass, marshaler, cacheDumpDir string) (db AccountingStorage, err error) { +func ConfigureAccountingStorage(db_type, host, port, name, user, pass, marshaler, cacheDumpDir string, loadHistorySize int) (db AccountingStorage, err error) { var d AccountingStorage switch db_type { case utils.REDIS: @@ -68,9 +68,9 @@ func ConfigureAccountingStorage(db_type, host, port, name, user, pass, marshaler if port != "" { host += ":" + port } - d, err = NewRedisStorage(host, db_nb, pass, marshaler, utils.REDIS_MAX_CONNS, cacheDumpDir) + d, err = NewRedisStorage(host, db_nb, pass, marshaler, utils.REDIS_MAX_CONNS, cacheDumpDir, loadHistorySize) case utils.MONGO: - d, err = NewMongoStorage(host, port, name, user, pass, nil, cacheDumpDir) + d, err = NewMongoStorage(host, port, name, user, pass, nil, cacheDumpDir, loadHistorySize) db = d.(AccountingStorage) default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'", @@ -99,7 +99,7 @@ func ConfigureLogStorage(db_type, host, port, name, user, pass, marshaler string d, err = NewRedisStorage(host, db_nb, pass, marshaler) */ case utils.MONGO: - d, err = NewMongoStorage(host, port, name, user, pass, nil, "") + d, err = NewMongoStorage(host, port, name, user, pass, nil, "", 1) case utils.POSTGRES: d, err = NewPostgresStorage(host, port, name, user, pass, maxConn, maxIdleConn) case utils.MYSQL: @@ -122,7 +122,7 @@ func ConfigureLoadStorage(db_type, host, port, name, user, pass, marshaler strin case utils.MYSQL: d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn) case utils.MONGO: - d, err = NewMongoStorage(host, port, name, user, pass, cdrsIndexes, "") + d, err = NewMongoStorage(host, port, name, user, pass, cdrsIndexes, "", 1) default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s]", db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES)) @@ -141,7 +141,7 @@ func ConfigureCdrStorage(db_type, host, port, name, user, pass string, maxConn, case utils.MYSQL: d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn) case utils.MONGO: - d, err = NewMongoStorage(host, port, name, user, pass, cdrsIndexes, "") + d, err = NewMongoStorage(host, port, name, user, pass, cdrsIndexes, "", 1) default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s]", db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES)) diff --git a/engine/tp_reader.go b/engine/tp_reader.go index c46e92026..8ca8efe23 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -14,7 +14,6 @@ import ( type TpReader struct { tpid string timezone string - loadHistSize int ratingStorage RatingStorage accountingStorage AccountingStorage lr LoadReader @@ -36,14 +35,12 @@ type TpReader struct { cdrStats map[string]*CdrStats users map[string]*UserProfile aliases map[string]*Alias - //loadInstance *utils.LoadInstance } -func NewTpReader(rs RatingStorage, as AccountingStorage, lr LoadReader, tpid, timezone string, loadHistSize int) *TpReader { +func NewTpReader(rs RatingStorage, as AccountingStorage, lr LoadReader, tpid, timezone string) *TpReader { tpr := &TpReader{ tpid: tpid, timezone: timezone, - loadHistSize: loadHistSize, ratingStorage: rs, accountingStorage: as, lr: lr, @@ -1616,13 +1613,6 @@ func (tpr *TpReader) IsValid() bool { return valid } -/*func (tpr *TpReader) GetLoadInstance() *utils.LoadInstance { - if tpr.loadInstance == nil { - tpr.loadInstance = &utils.LoadInstance{LoadId: utils.GenUUID(), TariffPlanId: tpr.tpid, LoadTime: time.Now()} - } - return tpr.loadInstance -}*/ - func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) { if tpr.ratingStorage == nil || tpr.accountingStorage == nil { return errors.New("no database connection") @@ -1815,15 +1805,6 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) { log.Print("\t", al.GetId()) } } - /* - ldInst := tpr.GetLoadInstance() - if verbose { - log.Printf("LoadHistory, instance: %+v\n", ldInst) - } - if err = tpr.accountingStorage.AddLoadHistory(ldInst, tpr.loadHistSize); err != nil { - return err - } - */ return }