From cf067eda4bbbf4e1c5dfff87cef142c894bc7626 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Mon, 12 Oct 2015 20:20:58 +0300 Subject: [PATCH] tested mongo tp wit cgradmin formal test pending --- cmd/cgr-engine/cgr-engine.go | 2 +- engine/storage_mongo.go | 89 +++++------------- engine/storage_mongo_tp.go | 172 ++++++++++++++++++++++++----------- 3 files changed, 140 insertions(+), 123 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 5cd7aeca2..22bfa2ba7 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -558,7 +558,7 @@ func main() { logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding, cfg.StorDBMaxOpenConns, cfg.StorDBMaxIdleConns) if err != nil { // Cannot configure logger database, show stopper - utils.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting! (%+v)", err, cfg)) + utils.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err)) return } defer logDb.Close() diff --git a/engine/storage_mongo.go b/engine/storage_mongo.go index 149d38da4..34344c830 100644 --- a/engine/storage_mongo.go +++ b/engine/storage_mongo.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "strings" - "time" "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/utils" @@ -33,26 +32,28 @@ import ( ) const ( - colDst = "destinations" - colAct = "actions" - colApl = "actionplans" - colAtr = "actiontriggers" - colRpl = "ratingplans" - colRpf = "ratingprofiles" - colAcc = "accounts" - colShg = "sharedgroups" - colLcr = "lcrrules" - colDcs = "derivedchargers" - colAls = "aliases" - colStq = "statsqeues" - colPbs = "pubsub" - colUsr = "users" - colCrs = "cdrstats" - colLht = "loadhistory" - colLogCC = "callcostlogs" - colLogAtr = "actiontriggerslogs" - colLogApl = "actionplanlogs" - colLogErr = "errorlogs" + colDst = "destinations" + colAct = "actions" + colApl = "actionplans" + colAtr = "actiontriggers" + colRpl = "ratingplans" + colRpf = "ratingprofiles" + colAcc = "accounts" + colShg = "sharedgroups" + colLcr = "lcrrules" + colDcs = "derivedchargers" + colAls = "aliases" + colStq = "statsqeues" + colPbs = "pubsub" + colUsr = "users" + colCrs = "cdrstats" + colLht = "loadhistory" + colLogCC = "callcostlogs" + colLogAtr = "actiontriggerslogs" + colLogApl = "actionplanlogs" + colLogErr = "errorlogs" + colCdrs = "cdrs" + colRatedCdrs = "ratedcdrs" ) type MongoStorage struct { @@ -123,33 +124,6 @@ func (ms *MongoStorage) Flush(ignore string) (err error) { return nil } -type LogCostEntry struct { - Id string `bson:"_id,omitempty"` - CallCost *CallCost - Source string -} - -type LogTimingEntry struct { - ActionPlan *ActionPlan - Actions Actions - LogTime time.Time - Source string -} - -type LogTriggerEntry struct { - ubId string - ActionTrigger *ActionTrigger - Actions Actions - LogTime time.Time - Source string -} - -type LogErrEntry struct { - Id string `bson:"_id,omitempty"` - ErrStr string - Source string -} - func (ms *MongoStorage) CacheRatingAll() error { return ms.cacheRating(nil, nil, nil, nil, nil, nil, nil) } @@ -1041,22 +1015,3 @@ func (ms *MongoStorage) GetAllCdrStats() (css []*CdrStats, err error) { err = iter.Close() return } - -func (ms *MongoStorage) LogCallCost(cgrid, source string, cc *CallCost) error { - return ms.db.C(colLogCC).Insert(&LogCostEntry{cgrid, cc, source}) -} - -func (ms *MongoStorage) GetCallCostLog(cgrid, source string) (cc *CallCost, err error) { - result := new(LogCostEntry) - err = ms.db.C(colLogCC).Find(bson.M{"_id": cgrid, "source": source}).One(result) - cc = result.CallCost - return -} - -func (ms *MongoStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) { - return ms.db.C(colLogAtr).Insert(&LogTriggerEntry{ubId, at, as, time.Now(), source}) -} - -func (ms *MongoStorage) LogActionPlan(source string, at *ActionPlan, as Actions) (err error) { - return ms.db.C(colLogApl).Insert(&LogTimingEntry{at, as, time.Now(), source}) -} diff --git a/engine/storage_mongo_tp.go b/engine/storage_mongo_tp.go index 4f345d5f1..77a5af084 100644 --- a/engine/storage_mongo_tp.go +++ b/engine/storage_mongo_tp.go @@ -2,32 +2,12 @@ package engine import ( "strings" + "time" "github.com/cgrates/cgrates/utils" "gopkg.in/mgo.v2/bson" ) -const ( - colTpTmg = "tp_timings" - colTpDst = "tp_destinations" - colTpRts = "tp_rates" - colTpDrs = "tp_destinationrates" - colTpRpl = "tp_ratingplans" - colTpRpf = "tp_ratingprofiles" - colTpAct = "tp_actions" - colTpApl = "tp_actionplans" - colTpAtr = "tp_actiontriggers" - colTpAcc = "tp_accounts" - colTpShg = "tp_sharedgroups" - colTpLcr = "tp_lcrrules" - colTpDcs = "tp_derivedchargers" - colTpAls = "tp_aliases" - colTpStq = "tp_statsqeues" - colTpPbs = "tp_pubsub" - colTpUsr = "tp_users" - colTpCrs = "tp_cdrstats" -) - func (ms *MongoStorage) GetTpIds() ([]string, error) { tpidMap := make(map[string]bool) cols, err := ms.db.CollectionNames() @@ -56,8 +36,24 @@ func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinct utils.TPDisti for _, d := range distinct { selectors[d] = 1 } - filter["tpid"] = tpid - q := ms.db.C(table).Find(filter) + findMap := make(map[string]interface{}) + if tpid != "" { + findMap["tpid"] = tpid + } + for k, v := range filter { + findMap[k] = v + } + + if pag != nil && pag.SearchTerm != "" { + var searchItems []bson.M + for _, d := range distinct { + searchItems = append(searchItems, bson.M{d: bson.RegEx{ + Pattern: ".*" + pag.SearchTerm + ".*", + Options: ""}}) + } + findMap["$and"] = []bson.M{bson.M{"$or": searchItems}} + } + q := ms.db.C(table).Find(findMap) if pag != nil { if pag.Limit != nil { q = q.Limit(*pag.Limit) @@ -101,7 +97,7 @@ func (ms *MongoStorage) GetTpTimings(tpid, tag string) ([]TpTiming, error) { filter["tag"] = tag } var results []TpTiming - err := ms.db.C(colTpTmg).Find(filter).All(&results) + err := ms.db.C(utils.TBL_TP_TIMINGS).Find(filter).All(&results) return results, err } @@ -113,7 +109,7 @@ func (ms *MongoStorage) GetTpDestinations(tpid, tag string) ([]TpDestination, er filter["tag"] = tag } var results []TpDestination - err := ms.db.C(colTpDst).Find(filter).All(&results) + err := ms.db.C(utils.TBL_TP_DESTINATIONS).Find(filter).All(&results) return results, err } @@ -125,7 +121,7 @@ func (ms *MongoStorage) GetTpRates(tpid, tag string) ([]TpRate, error) { filter["tag"] = tag } var results []TpRate - err := ms.db.C(colTpRts).Find(filter).All(&results) + err := ms.db.C(utils.TBL_TP_RATES).Find(filter).All(&results) return results, err } @@ -137,7 +133,7 @@ func (ms *MongoStorage) GetTpDestinationRates(tpid, tag string, pag *utils.Pagin filter["tag"] = tag } var results []TpDestinationRate - q := ms.db.C(colTpDrs).Find(filter) + q := ms.db.C(utils.TBL_TP_DESTINATION_RATES).Find(filter) if pag != nil { if pag.Limit != nil { q = q.Limit(*pag.Limit) @@ -158,7 +154,7 @@ func (ms *MongoStorage) GetTpRatingPlans(tpid, tag string, pag *utils.Paginator) filter["tag"] = tag } var results []TpRatingPlan - q := ms.db.C(colTpRpl).Find(filter) + q := ms.db.C(utils.TBL_TP_RATING_PLANS).Find(filter) if pag != nil { if pag.Limit != nil { q = q.Limit(*pag.Limit) @@ -189,7 +185,7 @@ func (ms *MongoStorage) GetTpRatingProfiles(tp *TpRatingProfile) ([]TpRatingProf filter["loadid"] = tp.Loadid } var results []TpRatingProfile - err := ms.db.C(colTpRpf).Find(filter).All(&results) + err := ms.db.C(utils.TBL_TP_RATE_PROFILES).Find(filter).All(&results) return results, err } @@ -201,7 +197,7 @@ func (ms *MongoStorage) GetTpSharedGroups(tpid, tag string) ([]TpSharedGroup, er filter["tag"] = tag } var results []TpSharedGroup - err := ms.db.C(colTpShg).Find(filter).All(&results) + err := ms.db.C(utils.TBL_TP_SHARED_GROUPS).Find(filter).All(&results) return results, err } @@ -213,7 +209,7 @@ func (ms *MongoStorage) GetTpCdrStats(tpid, tag string) ([]TpCdrstat, error) { filter["tag"] = tag } var results []TpCdrstat - err := ms.db.C(colTpCrs).Find(filter).All(&results) + err := ms.db.C(utils.TBL_TP_CDR_STATS).Find(filter).All(&results) return results, err } func (ms *MongoStorage) GetTpLCRs(tp *TpLcrRule) ([]TpLcrRule, error) { @@ -234,7 +230,7 @@ func (ms *MongoStorage) GetTpLCRs(tp *TpLcrRule) ([]TpLcrRule, error) { filter["subject"] = tp.Subject } var results []TpLcrRule - err := ms.db.C(colTpLcr).Find(filter).All(&results) + err := ms.db.C(utils.TBL_TP_LCRS).Find(filter).All(&results) return results, err } @@ -247,7 +243,7 @@ func (ms *MongoStorage) GetTpUsers(tp *TpUser) ([]TpUser, error) { filter["username"] = tp.UserName } var results []TpUser - err := ms.db.C(colTpUsr).Find(filter).All(&results) + err := ms.db.C(utils.TBL_TP_USERS).Find(filter).All(&results) return results, err } @@ -272,7 +268,7 @@ func (ms *MongoStorage) GetTpAliases(tp *TpAlias) ([]TpAlias, error) { filter["context"] = tp.Context } var results []TpAlias - err := ms.db.C(colTpAls).Find(filter).All(&results) + err := ms.db.C(utils.TBL_TP_ALIASES).Find(filter).All(&results) return results, err } @@ -297,7 +293,7 @@ func (ms *MongoStorage) GetTpDerivedChargers(tp *TpDerivedCharger) ([]TpDerivedC filter["loadid"] = tp.Loadid } var results []TpDerivedCharger - err := ms.db.C(colTpDcs).Find(filter).All(&results) + err := ms.db.C(utils.TBL_TP_DERIVED_CHARGERS).Find(filter).All(&results) return results, err } @@ -309,7 +305,7 @@ func (ms *MongoStorage) GetTpActions(tpid, tag string) ([]TpAction, error) { filter["tag"] = tag } var results []TpAction - err := ms.db.C(colTpAct).Find(filter).All(&results) + err := ms.db.C(utils.TBL_TP_ACTIONS).Find(filter).All(&results) return results, err } @@ -321,7 +317,7 @@ func (ms *MongoStorage) GetTpActionPlans(tpid, tag string) ([]TpActionPlan, erro filter["tag"] = tag } var results []TpActionPlan - err := ms.db.C(colTpApl).Find(filter).All(&results) + err := ms.db.C(utils.TBL_TP_ACTION_PLANS).Find(filter).All(&results) return results, err } @@ -333,7 +329,7 @@ func (ms *MongoStorage) GetTpActionTriggers(tpid, tag string) ([]TpActionTrigger filter["tag"] = tag } var results []TpActionTrigger - err := ms.db.C(colTpAtr).Find(filter).All(&results) + err := ms.db.C(utils.TBL_TP_ACTION_TRIGGERS).Find(filter).All(&results) return results, err } @@ -352,7 +348,7 @@ func (ms *MongoStorage) GetTpAccountActions(tp *TpAccountAction) ([]TpAccountAct filter["loadid"] = tp.Loadid } var results []TpAccountAction - err := ms.db.C(colTpAcc).Find(filter).All(&results) + err := ms.db.C(utils.TBL_TP_ACCOUNT_ACTIONS).Find(filter).All(&results) return results, err } @@ -381,7 +377,7 @@ func (ms *MongoStorage) SetTpTimings(tps []TpTiming) error { } m := make(map[string]bool) - tx := ms.db.C(colTpTmg).Bulk() + tx := ms.db.C(utils.TBL_TP_TIMINGS).Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -398,7 +394,7 @@ func (ms *MongoStorage) SetTpDestinations(tps []TpDestination) error { } m := make(map[string]bool) - tx := ms.db.C(colTpDst).Bulk() + tx := ms.db.C(utils.TBL_TP_DESTINATIONS).Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -415,7 +411,7 @@ func (ms *MongoStorage) SetTpRates(tps []TpRate) error { } m := make(map[string]bool) - tx := ms.db.C(colTpRts).Bulk() + tx := ms.db.C(utils.TBL_TP_RATES).Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -432,7 +428,7 @@ func (ms *MongoStorage) SetTpDestinationRates(tps []TpDestinationRate) error { } m := make(map[string]bool) - tx := ms.db.C(colTpDrs).Bulk() + tx := ms.db.C(utils.TBL_TP_DESTINATION_RATES).Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -449,7 +445,7 @@ func (ms *MongoStorage) SetTpRatingPlans(tps []TpRatingPlan) error { } m := make(map[string]bool) - tx := ms.db.C(colTpRpl).Bulk() + tx := ms.db.C(utils.TBL_TP_RATING_PLANS).Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -466,7 +462,7 @@ func (ms *MongoStorage) SetTpRatingProfiles(tps []TpRatingProfile) error { } m := make(map[string]bool) - tx := ms.db.C(colTpRpf).Bulk() + tx := ms.db.C(utils.TBL_TP_RATE_PROFILES).Bulk() for _, tp := range tps { if found, _ := m[tp.GetRatingProfileId()]; !found { m[tp.GetRatingProfileId()] = true @@ -490,7 +486,7 @@ func (ms *MongoStorage) SetTpSharedGroups(tps []TpSharedGroup) error { } m := make(map[string]bool) - tx := ms.db.C(colTpShg).Bulk() + tx := ms.db.C(utils.TBL_TP_SHARED_GROUPS).Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -507,7 +503,7 @@ func (ms *MongoStorage) SetTpCdrStats(tps []TpCdrstat) error { } m := make(map[string]bool) - tx := ms.db.C(colTpCrs).Bulk() + tx := ms.db.C(utils.TBL_TP_CDR_STATS).Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -524,7 +520,7 @@ func (ms *MongoStorage) SetTpUsers(tps []TpUser) error { } m := make(map[string]bool) - tx := ms.db.C(colTpUsr).Bulk() + tx := ms.db.C(utils.TBL_TP_USERS).Bulk() for _, tp := range tps { if found, _ := m[tp.GetId()]; !found { m[tp.GetId()] = true @@ -545,7 +541,7 @@ func (ms *MongoStorage) SetTpAliases(tps []TpAlias) error { } m := make(map[string]bool) - tx := ms.db.C(colTpAls).Bulk() + tx := ms.db.C(utils.TBL_TP_ALIASES).Bulk() for _, tp := range tps { if found, _ := m[tp.GetId()]; !found { m[tp.GetId()] = true @@ -569,7 +565,7 @@ func (ms *MongoStorage) SetTpDerivedChargers(tps []TpDerivedCharger) error { } m := make(map[string]bool) - tx := ms.db.C(colTpRpl).Bulk() + tx := ms.db.C(utils.TBL_TP_DERIVED_CHARGERS).Bulk() for _, tp := range tps { if found, _ := m[tp.GetDerivedChargersId()]; !found { m[tp.GetDerivedChargersId()] = true @@ -592,7 +588,7 @@ func (ms *MongoStorage) SetTpLCRs(tps []TpLcrRule) error { } m := make(map[string]bool) - tx := ms.db.C(colTpRpl).Bulk() + tx := ms.db.C(utils.TBL_TP_LCRS).Bulk() for _, tp := range tps { if found, _ := m[tp.GetLcrRuleId()]; !found { m[tp.GetLcrRuleId()] = true @@ -615,7 +611,7 @@ func (ms *MongoStorage) SetTpActions(tps []TpAction) error { } m := make(map[string]bool) - tx := ms.db.C(colTpAct).Bulk() + tx := ms.db.C(utils.TBL_TP_ACTIONS).Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -632,7 +628,7 @@ func (ms *MongoStorage) SetTpActionPlans(tps []TpActionPlan) error { } m := make(map[string]bool) - tx := ms.db.C(colTpApl).Bulk() + tx := ms.db.C(utils.TBL_TP_ACTION_PLANS).Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -649,7 +645,7 @@ func (ms *MongoStorage) SetTpActionTriggers(tps []TpActionTrigger) error { } m := make(map[string]bool) - tx := ms.db.C(colTpAtr).Bulk() + tx := ms.db.C(utils.TBL_TP_ACTION_TRIGGERS).Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -666,7 +662,7 @@ func (ms *MongoStorage) SetTpAccountActions(tps []TpAccountAction) error { } m := make(map[string]bool) - tx := ms.db.C(colTpAcc).Bulk() + tx := ms.db.C(utils.TBL_TP_ACCOUNT_ACTIONS).Bulk() for _, tp := range tps { if found, _ := m[tp.GetAccountActionId()]; !found { m[tp.GetAccountActionId()] = true @@ -681,3 +677,69 @@ func (ms *MongoStorage) SetTpAccountActions(tps []TpAccountAction) error { _, err := tx.Run() return err } + +func (ms *MongoStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) { + return ms.db.C(colLogAtr).Insert(&struct { + ubId string + ActionTrigger *ActionTrigger + Actions Actions + LogTime time.Time + Source string + }{ubId, at, as, time.Now(), source}) +} + +func (ms *MongoStorage) LogActionPlan(source string, at *ActionPlan, as Actions) (err error) { + return ms.db.C(colLogApl).Insert(&struct { + ActionPlan *ActionPlan + Actions Actions + LogTime time.Time + Source string + }{at, as, time.Now(), source}) +} + +func (ms *MongoStorage) LogCallCost(cgrid, source, runid string, cc *CallCost) error { + return ms.db.C(colLogCC).Insert(&struct { + Id string `bson:"id,omitempty"` + Source string + Runid string `bson:"runid,omitempty"` + CallCost *CallCost + }{cgrid, source, runid, cc}) +} + +func (ms *MongoStorage) GetCallCostLog(cgrid, source, runid string) (cc *CallCost, err error) { + result := &struct { + Id string `bson:"id,omitempty"` + Source string + Runid string `bson:"runid,omitempty"` + CallCost *CallCost + }{} + err = ms.db.C(colLogCC).Find(bson.M{"_id": cgrid, "source": source}).One(result) + cc = result.CallCost + return +} + +func (ms *MongoStorage) SetCdr(cdr *StoredCdr) error { + return ms.db.C(colCdrs).Insert(cdr) +} + +func (ms *MongoStorage) SetRatedCdr(storedCdr *StoredCdr) error { + return ms.db.C(colRatedCdrs).Insert(storedCdr) +} + +// Remove CDR data out of all CDR tables based on their cgrid +func (ms *MongoStorage) RemStoredCdrs(cgrIds []string) error { + if len(cgrIds) == 0 { + return nil + } + + for _, col := range []string{colCdrs, colRatedCdrs, colLogCC} { + if err := ms.db.C(col).Update(bson.M{"cgrid": bson.M{"$in": cgrIds}}, map[string]interface{}{"deleted_at": time.Now()}); err != nil { + return err + } + } + return nil +} + +func (ms *MongoStorage) GetStoredCdrs(qryFltr *utils.CdrsFilter) ([]*StoredCdr, int64, error) { + return nil, 0, utils.ErrNotImplemented +}