From be388e0aa466f14c76767b9b4a0dfff1292e2b66 Mon Sep 17 00:00:00 2001 From: TeoV Date: Mon, 20 Nov 2017 13:11:02 +0200 Subject: [PATCH] LCRProfile (1) --- engine/datamanager.go | 45 ++++++++++- engine/lcrs.go | 5 ++ engine/libtest.go | 2 +- engine/model_helpers.go | 139 ++++++++++++++++++++++++++++++++- engine/models.go | 16 ++++ engine/storage_interface.go | 5 ++ engine/storage_map.go | 32 ++++++++ engine/storage_mongo_datadb.go | 97 ++++++++++++++++------- engine/storage_mongo_stordb.go | 31 ++++++++ engine/storage_redis.go | 34 +++++++- engine/storage_sql.go | 22 ++++++ engine/tp_reader.go | 96 ++++++++++++++++++++++- utils/apitpdata.go | 14 ++++ utils/consts.go | 5 ++ 14 files changed, 506 insertions(+), 37 deletions(-) diff --git a/engine/datamanager.go b/engine/datamanager.go index 141c1447d..d5e826632 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -38,7 +38,7 @@ func (dm *DataManager) DataDB() DataDB { return dm.dataDB } -func (dm *DataManager) LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aaPlIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rpIDs, resIDs, stqIDs, stqpIDs, thIDs, thpIDs, fltrIDs []string) (err error) { +func (dm *DataManager) LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aaPlIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rpIDs, resIDs, stqIDs, stqpIDs, thIDs, thpIDs, fltrIDs, lcrPrfIDs []string) (err error) { if dm.DataDB().GetStorageType() == utils.MAPSTOR { if dm.cacheCfg == nil { return @@ -49,7 +49,7 @@ func (dm *DataManager) LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.LCR_PREFIX, utils.CDR_STATS_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACTION_TRIGGER_PREFIX, utils.SHARED_GROUP_PREFIX, utils.ALIASES_PREFIX, utils.REVERSE_ALIASES_PREFIX, utils.StatQueuePrefix, utils.StatQueueProfilePrefix, - utils.ThresholdPrefix, utils.ThresholdProfilePrefix, utils.FilterPrefix}, k) && cacheCfg.Precache { + utils.ThresholdPrefix, utils.ThresholdProfilePrefix, utils.FilterPrefix, utils.LCRProfilePrefix}, k) && cacheCfg.Precache { if err := dm.PreloadCacheForPrefix(k); err != nil && err != utils.ErrInvalidKey { return err } @@ -78,6 +78,7 @@ func (dm *DataManager) LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, utils.ThresholdPrefix: thIDs, utils.ThresholdProfilePrefix: thpIDs, utils.FilterPrefix: fltrIDs, + utils.LCRProfilePrefix: lcrPrfIDs, } { if err = dm.CacheDataFromDB(key, ids, false); err != nil { return @@ -134,7 +135,8 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b utils.StatQueueProfilePrefix, utils.ThresholdPrefix, utils.ThresholdProfilePrefix, - utils.FilterPrefix}, prfx) { + utils.FilterPrefix, + utils.LCRProfilePrefix}, prfx) { return utils.NewCGRError(utils.MONGO, utils.MandatoryIEMissingCaps, utils.UnsupportedCachePrefix, @@ -220,6 +222,9 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b case utils.FilterPrefix: tntID := utils.NewTenantID(dataID) _, err = dm.GetFilter(tntID.Tenant, tntID.ID, true, utils.NonTransactional) + case utils.LCRProfilePrefix: + tntID := utils.NewTenantID(dataID) + _, err = dm.GetLCRProfile(tntID.Tenant, tntID.ID, true, utils.NonTransactional) } if err != nil { return utils.NewCGRError(utils.MONGO, @@ -815,3 +820,37 @@ func (dm *DataManager) GetCdrStats(key string) (cs *CdrStats, err error) { func (dm *DataManager) GetAllCdrStats() (css []*CdrStats, err error) { return dm.DataDB().GetAllCdrStatsDrv() } + +func (dm *DataManager) GetLCRProfile(tenant, id string, skipCache bool, transactionID string) (lcrprf *LCRProfile, err error) { + key := utils.LCRProfilePrefix + utils.ConcatenatedKey(tenant, id) + if !skipCache { + if x, ok := cache.Get(key); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*LCRProfile), nil + } + } + lcrprf, err = dm.dataDB.GetLCRProfileDrv(tenant, id) + if err != nil { + if err == utils.ErrNotFound { + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + } + return nil, err + } + cache.Set(key, lcrprf, cacheCommit(transactionID), transactionID) + return +} + +func (dm *DataManager) SetLCRProfile(lcrprf *LCRProfile) (err error) { + return dm.DataDB().SetLCRProfileDrv(lcrprf) +} + +func (dm *DataManager) RemoveLCRProfile(tenant, id, transactionID string) (err error) { + if err = dm.DataDB().RemoveLCRProfileDrv(tenant, id); err != nil { + return + } + cache.RemKey(utils.LCRProfilePrefix+utils.ConcatenatedKey(tenant, id), + cacheCommit(transactionID), transactionID) + return +} diff --git a/engine/lcrs.go b/engine/lcrs.go index dcf9d7dad..cc9f0cf5e 100644 --- a/engine/lcrs.go +++ b/engine/lcrs.go @@ -35,3 +35,8 @@ type LCRProfile struct { StatIDs []string // StatProfiles queried in case of QoS based strategies Weight float64 } + +// TenantID returns unique identifier of the LCRProfile in a multi-tenant environment +func (rp *LCRProfile) TenantID() string { + return utils.ConcatenatedKey(rp.Tenant, rp.ID) +} diff --git a/engine/libtest.go b/engine/libtest.go index 66df14ab5..62d0ffda2 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -41,7 +41,7 @@ func InitDataDb(cfg *config.CGRConfig) error { if err := dm.DataDB().Flush(""); err != nil { return err } - dm.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + dm.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) // Write version before starting if err := SetDBVersions(dm.dataDB); err != nil { return err diff --git a/engine/model_helpers.go b/engine/model_helpers.go index 7b760ec11..decc8b6bc 100755 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -2371,7 +2371,6 @@ func APItoModelTPFilter(th *utils.TPFilter) (mdls TpFilterS) { mdls = append(mdls, mdl) } return - } func APItoFilter(tpTH *utils.TPFilter, timezone string) (th *Filter, err error) { @@ -2419,3 +2418,141 @@ func FilterToTPFilter(f *Filter) (tpFltr *utils.TPFilter) { } return } + +type TpLCRProfiles []*TpLCRProfile + +func (tps TpLCRProfiles) AsTPLCRProfile() (result []*utils.TPLCRProfile) { + mst := make(map[string]*utils.TPLCRProfile) + for _, tp := range tps { + th, found := mst[tp.ID] + if !found { + th = &utils.TPLCRProfile{ + TPid: tp.Tpid, + Tenant: tp.Tenant, + ID: tp.ID, + Strategy: tp.Strategy, + SupplierID: tp.SupplierID, + } + } + if tp.StrategyParams != "" { + strategyParamSplit := strings.Split(tp.StrategyParams, utils.INFIELD_SEP) + for _, strategyParam := range strategyParamSplit { + th.StrategyParams = append(th.StrategyParams, strategyParam) + } + } + if tp.RatingPlanIDs != "" { + ratingPlansIDsSplit := strings.Split(tp.RatingPlanIDs, utils.INFIELD_SEP) + for _, ratingPlanID := range ratingPlansIDsSplit { + th.RatingPlanIDs = append(th.RatingPlanIDs, ratingPlanID) + } + } + if tp.StatIDs != "" { + statIDsSplit := strings.Split(tp.StatIDs, utils.INFIELD_SEP) + for _, statID := range statIDsSplit { + th.StatIDs = append(th.StatIDs, statID) + } + } + if tp.Weight != 0 { + th.Weight = tp.Weight + } + if len(tp.ActivationInterval) != 0 { + th.ActivationInterval = new(utils.TPActivationInterval) + aiSplt := strings.Split(tp.ActivationInterval, utils.INFIELD_SEP) + if len(aiSplt) == 2 { + th.ActivationInterval.ActivationTime = aiSplt[0] + th.ActivationInterval.ExpiryTime = aiSplt[1] + } else if len(aiSplt) == 1 { + th.ActivationInterval.ActivationTime = aiSplt[0] + } + } + if tp.FilterIDs != "" { + filterSplit := strings.Split(tp.FilterIDs, utils.INFIELD_SEP) + for _, filter := range filterSplit { + th.FilterIDs = append(th.FilterIDs, filter) + } + } + + mst[tp.ID] = th + } + result = make([]*utils.TPLCRProfile, len(mst)) + i := 0 + for _, th := range mst { + result[i] = th + i++ + } + return +} + +func APItoModelTPLCRProfile(st *utils.TPLCRProfile) (mdls TpLCRProfiles) { + if st != nil { + for i, fltr := range st.FilterIDs { + mdl := &TpLCRProfile{ + Tenant: st.Tenant, + Tpid: st.TPid, + ID: st.ID, + } + if i == 0 { + mdl.Strategy = st.Strategy + mdl.Weight = st.Weight + mdl.SupplierID = st.SupplierID + for i, val := range st.StrategyParams { + if i != 0 { + mdl.StrategyParams += utils.INFIELD_SEP + } + mdl.StrategyParams += val + } + for i, val := range st.RatingPlanIDs { + if i != 0 { + mdl.RatingPlanIDs += utils.INFIELD_SEP + } + mdl.RatingPlanIDs += val + } + for i, val := range st.StatIDs { + if i != 0 { + mdl.StatIDs += utils.INFIELD_SEP + } + mdl.StatIDs += val + } + if st.ActivationInterval != nil { + if st.ActivationInterval.ActivationTime != "" { + mdl.ActivationInterval = st.ActivationInterval.ActivationTime + } + if st.ActivationInterval.ExpiryTime != "" { + mdl.ActivationInterval += utils.INFIELD_SEP + st.ActivationInterval.ExpiryTime + } + } + } + mdl.FilterIDs = fltr + mdls = append(mdls, mdl) + } + } + return +} + +func APItoLCRProfile(tpTH *utils.TPLCRProfile, timezone string) (th *LCRProfile, err error) { + th = &LCRProfile{ + Tenant: tpTH.Tenant, + ID: tpTH.ID, + Strategy: tpTH.Strategy, + SupplierID: tpTH.SupplierID, + Weight: tpTH.Weight, + } + for _, stp := range tpTH.StrategyParams { + th.StrategyParams = append(th.StrategyParams, stp) + } + for _, fli := range tpTH.FilterIDs { + th.FilterIDs = append(th.FilterIDs, fli) + } + if tpTH.ActivationInterval != nil { + if th.ActivationInterval, err = tpTH.ActivationInterval.AsActivationInterval(timezone); err != nil { + return nil, err + } + } + for _, rpl := range tpTH.RatingPlanIDs { + th.RatingPlanIDs = append(th.RatingPlanIDs, rpl) + } + for _, sts := range tpTH.StatIDs { + th.StatIDs = append(th.StatIDs, sts) + } + return th, nil +} diff --git a/engine/models.go b/engine/models.go index ded146bd6..8af59b9aa 100755 --- a/engine/models.go +++ b/engine/models.go @@ -524,3 +524,19 @@ type TpFilter struct { ActivationInterval string `index:"5" re:""` CreatedAt time.Time } + +type TpLCRProfile struct { + PK uint `gorm:"primary_key"` + Tpid string + Tenant string `index:"0" re:""` + ID string `index:"1" re:""` + FilterIDs string `index:"2" re:""` + ActivationInterval string `index:"3" re:""` + Strategy string `index:"4" re:""` + StrategyParams string `index:"5" re:""` + SupplierID string `index:"6" re:""` + RatingPlanIDs string `index:"7" re:""` + StatIDs string `index:"8" re:""` + Weight float64 `index:"9" re:"\d+\.?\d*"` + CreatedAt time.Time +} diff --git a/engine/storage_interface.go b/engine/storage_interface.go index d8b2e6297..3d96f851d 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -128,6 +128,9 @@ type DataDB interface { GetFilterDrv(string, string) (*Filter, error) SetFilterDrv(*Filter) error RemoveFilterDrv(string, string) error + GetLCRProfileDrv(string, string) (*LCRProfile, error) + SetLCRProfileDrv(*LCRProfile) error + RemoveLCRProfileDrv(string, string) error } type StorDB interface { @@ -175,6 +178,7 @@ type LoadReader interface { GetTPStats(string, string) ([]*utils.TPStats, error) GetTPThresholds(string, string) ([]*utils.TPThreshold, error) GetTPFilters(string, string) ([]*utils.TPFilter, error) + GetTPLCRProfiles(string, string) ([]*utils.TPLCRProfile, error) } type LoadWriter interface { @@ -199,6 +203,7 @@ type LoadWriter interface { SetTPStats([]*utils.TPStats) error SetTPThresholds([]*utils.TPThreshold) error SetTPFilters([]*utils.TPFilter) error + SetTPLCRProfiles([]*utils.TPLCRProfile) error } // NewMarshaler returns the marshaler type selected by mrshlerStr diff --git a/engine/storage_map.go b/engine/storage_map.go index 39c76ffad..23678385c 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1380,7 +1380,39 @@ func (ms *MapStorage) RemoveFilterDrv(tenant, id string) (err error) { defer ms.mu.Unlock() key := utils.FilterPrefix + utils.ConcatenatedKey(tenant, id) delete(ms.dict, key) + return +} +func (ms *MapStorage) GetLCRProfileDrv(tenant, id string) (r *LCRProfile, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + values, ok := ms.dict[utils.LCRProfilePrefix+utils.ConcatenatedKey(tenant, id)] + if !ok { + return nil, utils.ErrNotFound + } + err = ms.ms.Unmarshal(values, &r) + if err != nil { + return nil, err + } + return +} + +func (ms *MapStorage) SetLCRProfileDrv(r *LCRProfile) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() + result, err := ms.ms.Marshal(r) + if err != nil { + return err + } + ms.dict[utils.LCRProfilePrefix+utils.ConcatenatedKey(r.Tenant, r.ID)] = result + return +} + +func (ms *MapStorage) RemoveLCRProfileDrv(tenant, id string) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() + key := utils.LCRProfilePrefix + utils.ConcatenatedKey(tenant, id) + delete(ms.dict, key) return } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 13895ac9c..ac7590df1 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -34,36 +34,37 @@ import ( ) const ( - colDst = "destinations" - colRds = "reverse_destinations" - colAct = "actions" - colApl = "action_plans" - colAAp = "account_action_plans" - colTsk = "tasks" - colAtr = "action_triggers" - colRpl = "rating_plans" - colRpf = "rating_profiles" - colAcc = "accounts" - colShg = "shared_groups" - colLcr = "lcr_rules" - colDcs = "derived_chargers" - colAls = "aliases" - colRCfgs = "reverse_aliases" - colStq = "stat_qeues" - colPbs = "pubsub" - colUsr = "users" - colCrs = "cdr_stats" - colLht = "load_history" - colVer = "versions" - colRsP = "resource_profiles" - colRFI = "request_filter_indexes" - colTmg = "timings" - colRes = "resources" - colSqs = "statqueues" - colSqp = "statqueue_profiles" - colTps = "threshold_profiles" - colThs = "thresholds" - colFlt = "filters" + colDst = "destinations" + colRds = "reverse_destinations" + colAct = "actions" + colApl = "action_plans" + colAAp = "account_action_plans" + colTsk = "tasks" + colAtr = "action_triggers" + colRpl = "rating_plans" + colRpf = "rating_profiles" + colAcc = "accounts" + colShg = "shared_groups" + colLcr = "lcr_rules" + colDcs = "derived_chargers" + colAls = "aliases" + colRCfgs = "reverse_aliases" + colStq = "stat_qeues" + colPbs = "pubsub" + colUsr = "users" + colCrs = "cdr_stats" + colLht = "load_history" + colVer = "versions" + colRsP = "resource_profiles" + colRFI = "request_filter_indexes" + colTmg = "timings" + colRes = "resources" + colSqs = "statqueues" + colSqp = "statqueue_profiles" + colTps = "threshold_profiles" + colThs = "thresholds" + colFlt = "filters" + colLcrPrf = "lcr_profiles" ) var ( @@ -573,6 +574,11 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er for iter.Next(&idResult) { result = append(result, utils.ThresholdProfilePrefix+utils.ConcatenatedKey(idResult.Tenant, idResult.Id)) } + case utils.LCRProfilePrefix: + iter := db.C(colLcrPrf).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"tenant": 1, "id": 1}).Iter() + for iter.Next(&idResult) { + result = append(result, utils.LCRProfilePrefix+utils.ConcatenatedKey(idResult.Tenant, idResult.Id)) + } default: err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix) } @@ -615,6 +621,9 @@ func (ms *MongoStorage) HasDataDrv(category, subject string) (has bool, err erro case utils.FilterPrefix: count, err = db.C(colFlt).Find(bson.M{"id": subject}).Count() has = count > 0 + case utils.LCRProfilePrefix: + count, err = db.C(colLcrPrf).Find(bson.M{"id": subject}).Count() + has = count > 0 default: err = fmt.Errorf("unsupported category in HasData: %s", category) } @@ -1965,3 +1974,31 @@ func (ms *MongoStorage) RemoveFilterDrv(tenant, id string) (err error) { } return nil } + +func (ms *MongoStorage) GetLCRProfileDrv(tenant, id string) (r *LCRProfile, err error) { + session, col := ms.conn(colLcrPrf) + defer session.Close() + if err = col.Find(bson.M{"tenant": tenant, "id": id}).One(&r); err != nil { + if err == mgo.ErrNotFound { + err = utils.ErrNotFound + } + return nil, err + } + return +} + +func (ms *MongoStorage) SetLCRProfileDrv(r *LCRProfile) (err error) { + session, col := ms.conn(colLcrPrf) + defer session.Close() + _, err = col.Upsert(bson.M{"tenant": r.Tenant, "id": r.ID}, r) + return +} + +func (ms *MongoStorage) RemoveLCRProfileDrv(tenant, id string) (err error) { + session, col := ms.conn(colLcrPrf) + defer session.Close() + if err = col.Remove(bson.M{"tenant": tenant, "id": id}); err != nil { + return + } + return nil +} diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go index 4e63bf9b8..7dce8dec6 100755 --- a/engine/storage_mongo_stordb.go +++ b/engine/storage_mongo_stordb.go @@ -1245,6 +1245,37 @@ func (ms *MongoStorage) SetTPFilters(tpTHs []*utils.TPFilter) (err error) { return } +func (ms *MongoStorage) GetTPLCRProfiles(tpid, id string) ([]*utils.TPLCRProfile, error) { + filter := bson.M{ + "tpid": tpid, + } + if id != "" { + filter["id"] = id + } + var results []*utils.TPLCRProfile + session, col := ms.conn(utils.TBLTPLCRProfiles) + defer session.Close() + err := col.Find(filter).All(&results) + if len(results) == 0 { + return results, utils.ErrNotFound + } + return results, err +} + +func (ms *MongoStorage) SetTPLCRProfiles(tpTHs []*utils.TPLCRProfile) (err error) { + if len(tpTHs) == 0 { + return + } + session, col := ms.conn(utils.TBLTPLCRProfiles) + defer session.Close() + tx := col.Bulk() + for _, tp := range tpTHs { + tx.Upsert(bson.M{"tpid": tp.TPid, "id": tp.ID}, tp) + } + _, err = tx.Run() + return +} + func (ms *MongoStorage) GetVersions(itm string) (vrs Versions, err error) { session, col := ms.conn(colVer) defer session.Close() diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 7e7470b8a..039e291d8 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -208,7 +208,8 @@ func (rs *RedisStorage) HasDataDrv(category, subject string) (bool, error) { switch category { case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, - utils.ResourcesPrefix, utils.StatQueuePrefix, utils.ThresholdPrefix, utils.FilterPrefix: + utils.ResourcesPrefix, utils.StatQueuePrefix, utils.ThresholdPrefix, + utils.FilterPrefix, utils.LCRProfilePrefix: i, err := rs.Cmd("EXISTS", category+subject).Int() return i == 1, err } @@ -1520,6 +1521,37 @@ func (rs *RedisStorage) RemoveFilterDrv(tenant, id string) (err error) { return } +func (rs *RedisStorage) GetLCRProfileDrv(tenant, id string) (r *LCRProfile, err error) { + key := utils.LCRProfilePrefix + utils.ConcatenatedKey(tenant, id) + var values []byte + if values, err = rs.Cmd("GET", key).Bytes(); err != nil { + if err == redis.ErrRespNil { // did not find the destination + err = utils.ErrNotFound + } + return + } + if err = rs.ms.Unmarshal(values, &r); err != nil { + return + } + return +} + +func (rs *RedisStorage) SetLCRProfileDrv(r *LCRProfile) (err error) { + result, err := rs.ms.Marshal(r) + if err != nil { + return err + } + return rs.Cmd("SET", utils.LCRProfilePrefix+utils.ConcatenatedKey(r.Tenant, r.ID), result).Err +} + +func (rs *RedisStorage) RemoveLCRProfileDrv(tenant, id string) (err error) { + key := utils.LCRProfilePrefix + utils.ConcatenatedKey(tenant, id) + if err = rs.Cmd("DEL", key).Err; err != nil { + return + } + return +} + func (rs *RedisStorage) GetStorageType() string { return utils.REDIS } diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 55c97e84c..b1b1a5eab 100755 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -689,6 +689,28 @@ func (self *SQLStorage) SetTPFilters(ths []*utils.TPFilter) error { return nil } +func (self *SQLStorage) SetTPLCRProfile(ths []*utils.TPLCRProfile) error { + if len(ths) == 0 { + return nil + } + tx := self.db.Begin() + for _, th := range ths { + // Remove previous + if err := tx.Where(&TpLCRProfile{Tpid: th.TPid, ID: th.ID}).Delete(TpLCRProfile{}).Error; err != nil { + tx.Rollback() + return err + } + for _, mst := range APItoModelTPLCRProfile(th) { + if err := tx.Save(&mst).Error; err != nil { + tx.Rollback() + return err + } + } + } + tx.Commit() + return nil +} + func (self *SQLStorage) SetSMCost(smc *SMCost) error { if smc.CostDetails == nil { return nil diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 05267871f..90acbcff6 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -57,15 +57,18 @@ type TpReader struct { sqProfiles map[utils.TenantID]*utils.TPStats thProfiles map[utils.TenantID]*utils.TPThreshold filters map[utils.TenantID]*utils.TPFilter + lcrProfiles map[utils.TenantID]*utils.TPLCRProfile resources []*utils.TenantID // IDs of resources which need creation based on resourceProfiles statQueues []*utils.TenantID // IDs of statQueues which need creation based on statQueueProfiles thresholds []*utils.TenantID // IDs of thresholds which need creation based on thresholdProfiles + lcrTntID []*utils.TenantID // IDs of thresholds which need creation based on thresholdProfiles revDests, revAliases, acntActionPlans map[string][]string thdsIndexers map[string]*ReqFilterIndexer // tenant, indexer sqpIndexers map[string]*ReqFilterIndexer // tenant, indexer resIndexers map[string]*ReqFilterIndexer // tenant, indexer + lcrIndexers map[string]*ReqFilterIndexer // tenant, indexer } func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string) *TpReader { @@ -136,6 +139,7 @@ func (tpr *TpReader) Init() { tpr.resProfiles = make(map[utils.TenantID]*utils.TPResource) tpr.sqProfiles = make(map[utils.TenantID]*utils.TPStats) tpr.thProfiles = make(map[utils.TenantID]*utils.TPThreshold) + tpr.lcrProfiles = make(map[utils.TenantID]*utils.TPLCRProfile) tpr.filters = make(map[utils.TenantID]*utils.TPFilter) tpr.revDests = make(map[string][]string) tpr.revAliases = make(map[string][]string) @@ -143,6 +147,7 @@ func (tpr *TpReader) Init() { tpr.thdsIndexers = make(map[string]*ReqFilterIndexer) tpr.sqpIndexers = make(map[string]*ReqFilterIndexer) tpr.resIndexers = make(map[string]*ReqFilterIndexer) + tpr.lcrIndexers = make(map[string]*ReqFilterIndexer) } func (tpr *TpReader) LoadDestinationsFiltered(tag string) (bool, error) { @@ -1760,6 +1765,53 @@ func (tpr *TpReader) LoadFilters() error { return tpr.LoadFiltersFiltered("") } +func (tpr *TpReader) LoadLCRProfilesFiltered(tag string) (err error) { + rls, err := tpr.lr.GetTPLCRProfiles(tpr.tpid, tag) + if err != nil { + return err + } + mapRsPfls := make(map[utils.TenantID]*utils.TPLCRProfile) + for _, rl := range rls { + mapRsPfls[utils.TenantID{Tenant: rl.Tenant, ID: rl.ID}] = rl + } + tpr.lcrProfiles = mapRsPfls + for tntID, res := range mapRsPfls { + resIndxrKey := utils.LCRProfilesStringIndex + tntID.Tenant + if has, err := tpr.dm.HasData(utils.LCRProfilePrefix, tntID.TenantID()); err != nil { + return err + } else if !has { + tpr.lcrTntID = append(tpr.lcrTntID, &utils.TenantID{Tenant: tntID.Tenant, ID: tntID.ID}) + } + // index resource for filters + if _, has := tpr.lcrIndexers[tntID.Tenant]; !has { + if tpr.lcrIndexers[tntID.Tenant], err = NewReqFilterIndexer(tpr.dm, resIndxrKey); err != nil { + return + } + } + for _, fltrID := range res.FilterIDs { + tpFltr, has := tpr.filters[utils.TenantID{Tenant: tntID.Tenant, ID: fltrID}] + if !has { + var fltr *Filter + if fltr, err = tpr.dm.GetFilter(tntID.Tenant, fltrID, false, utils.NonTransactional); err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %+v for resoruce: %+v", fltrID, res) + } + return + } else { + tpFltr = FilterToTPFilter(fltr) + } + } else { + tpr.lcrIndexers[tntID.Tenant].IndexTPFilter(tpFltr, res.ID) + } + } + } + return nil +} + +func (tpr *TpReader) LoadLCRProfiles() error { + return tpr.LoadLCRProfilesFiltered("") +} + func (tpr *TpReader) LoadAll() (err error) { if err = tpr.LoadDestinations(); err != nil && err.Error() != utils.NotFoundCaps { return @@ -1821,6 +1873,9 @@ func (tpr *TpReader) LoadAll() (err error) { if err = tpr.LoadThresholds(); err != nil && err.Error() != utils.NotFoundCaps { return } + if err = tpr.LoadLCRProfiles(); err != nil && err.Error() != utils.NotFoundCaps { + return + } return nil } @@ -2155,6 +2210,23 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("\t", thd.TenantID()) } } + + if verbose { + log.Print("LCRProfiles:") + } + for _, tpTH := range tpr.lcrProfiles { + th, err := APItoLCRProfile(tpTH, tpr.timezone) + if err != nil { + return err + } + if err = tpr.dm.SetLCRProfile(th); err != nil { + return err + } + if verbose { + log.Print("\t", th.TenantID()) + } + } + if verbose { log.Print("Timings:") } @@ -2226,6 +2298,18 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Printf("Tenant: %s, keys %+v", tenant, fltrIdxer.ChangedKeys().Slice()) } } + + if verbose { + log.Print("Indexing LCRProfiles") + } + for tenant, fltrIdxer := range tpr.lcrIndexers { + if err := fltrIdxer.StoreIndexes(); err != nil { + return err + } + if verbose { + log.Printf("Tenant: %s, keys %+v", tenant, fltrIdxer.ChangedKeys().Slice()) + } + } } return } @@ -2287,7 +2371,7 @@ func (tpr *TpReader) ShowStatistics() { log.Print("LCR rules: ", len(tpr.lcrs)) // cdr stats log.Print("CDR stats: ", len(tpr.cdrStats)) - // resource limits + // resource profiles log.Print("ResourceProfiles: ", len(tpr.resProfiles)) // stats log.Print("Stats: ", len(tpr.sqProfiles)) @@ -2295,6 +2379,8 @@ func (tpr *TpReader) ShowStatistics() { log.Print("Thresholds: ", len(tpr.thProfiles)) // filters log.Print("Filters: ", len(tpr.filters)) + // LCR profiles + log.Print("LCRProfiles: ", len(tpr.lcrProfiles)) } // Returns the identities loaded for a specific category, useful for cache reloads @@ -2452,6 +2538,14 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) { i++ } return keys, nil + case utils.LCRProfilePrefix: + keys := make([]string, len(tpr.lcrProfiles)) + i := 0 + for k, _ := range tpr.lcrProfiles { + keys[i] = k.TenantID() + i++ + } + return keys, nil } return nil, errors.New("Unsupported load category") } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 8523b56a6..1760db272 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1383,3 +1383,17 @@ type TPRequestFilter struct { FieldName string // Name of the field providing us the Values to check (used in case of some ) Values []string // Filter definition } + +type TPLCRProfile struct { + TPid string + Tenant string + ID string + FilterIDs []string + ActivationInterval *TPActivationInterval // Time when this limit becomes active and expires + Strategy string + StrategyParams []string + SupplierID string + RatingPlanIDs []string // RatingPlans used when computing price + StatIDs []string // StatProfiles queried in case of QoS based strategies + Weight float64 +} diff --git a/utils/consts.go b/utils/consts.go index 155718555..337a692ec 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -64,6 +64,7 @@ var ( CacheThresholdProfiles: ThresholdProfilePrefix, CacheThresholds: ThresholdPrefix, CacheFilters: FilterPrefix, + CacheLCRProfiles: LCRProfilePrefix, } CachePrefixToInstance map[string]string // will be built on init ) @@ -112,6 +113,7 @@ const ( TBLTPStats = "tp_stats" TBLTPThresholds = "tp_thresholds" TBLTPFilters = "tp_filters" + TBLTPLCRProfiles = "tp_lcr" TBLSMCosts = "sm_costs" TBLCDRs = "cdrs" TBLVersions = "versions" @@ -265,6 +267,8 @@ const ( LOG_CDR = "cdr_" LOG_MEDIATED_CDR = "mcd_" StatQueueProfilePrefix = "sqp_" + LCRProfilePrefix = "lcp_" + LCRProfilesStringIndex = "lci_" ThresholdProfilePrefix = "thp_" StatQueuePrefix = "stq_" LOADINST_KEY = "load_history" @@ -480,6 +484,7 @@ const ( CacheThresholdProfiles = "threshold_profiles" CacheThresholds = "thresholds" CacheFilters = "filters" + CacheLCRProfiles = "lcr_profiles" AccountUpdate = "AccountUpdate" BalanceUpdate = "BalanceUpdate" StatUpdate = "StatUpdate"