From d5039b3ce85c2faa55d3bcce62d23e2525abb0bf Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 8 Nov 2023 12:06:23 -0500 Subject: [PATCH] Add CDRs CRUD queries back and remove TP functions --- engine/cdr.go | 109 ++++ engine/storage_interface.go | 3 + engine/storage_internal_stordb.go | 264 +++++++++ engine/storage_mongo_datadb.go | 58 +- engine/storage_mongo_stordb.go | 221 ++++++-- engine/storage_mysql.go | 87 +++ engine/storage_postgres.go | 73 +++ engine/storage_sql.go | 863 ++++++++---------------------- utils/consts.go | 1 + 9 files changed, 974 insertions(+), 705 deletions(-) create mode 100644 engine/cdr.go create mode 100644 engine/storage_internal_stordb.go diff --git a/engine/cdr.go b/engine/cdr.go new file mode 100644 index 000000000..671c82ab9 --- /dev/null +++ b/engine/cdr.go @@ -0,0 +1,109 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "database/sql/driver" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "github.com/cgrates/cgrates/utils" +) + +type CDR struct { + Tenant string + Opts map[string]interface{} + Event map[string]interface{} + CreatedAt time.Time `json:",omitempty"` + UpdatedAt time.Time `json:",omitempty"` + DeletedAt *time.Time `json:",omitempty"` +} + +type CDRSQLTable struct { + ID int64 // this is used for incrementing while seting + Tenant string + Opts JSON `gorm:"type:jsonb"` //string + Event JSON `gorm:"type:jsonb"` //string + CreatedAt time.Time `json:",omitempty"` + UpdatedAt time.Time `json:",omitempty"` + DeletedAt *time.Time `json:",omitempty"` +} + +func (CDRSQLTable) TableName() string { + return utils.CDRsTBL +} + +// JSON type for storing maps of events and opts into gorm columns as jsob type +type JSON map[string]interface{} + +func (j JSON) GormDataType() string { + return "JSONB" +} + +// Scan scan value into Jsonb, implements sql.Scanner interface +func (j *JSON) Scan(value interface{}) (err error) { + switch v := value.(type) { + case []byte: + return json.Unmarshal(v, &j) + case string: + return json.Unmarshal([]byte(v), &j) + default: + return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value)) + } +} + +// Value return json value, implement driver.Valuer interface +func (j JSON) Value() (driver.Value, error) { + return json.Marshal(j) +} + +func GetUniqueCDRID(cgrEv *utils.CGREvent) string { + if chargeId, ok := cgrEv.APIOpts[utils.MetaChargeID]; ok { + return utils.IfaceAsString(chargeId) + } else if originID, ok := cgrEv.APIOpts[utils.MetaOriginID]; ok { + return utils.IfaceAsString(originID) + } + return utils.UUIDSha1Prefix() +} + +func NewCGREventFromCDR(cdr *CDR) *utils.CGREvent { + return &utils.CGREvent{ + Tenant: cdr.Tenant, + ID: utils.Sha1(), + Event: cdr.Event, + APIOpts: cdr.Opts, + } +} + +// checkNestedFields checks if there are elements or values nested (e.g *opts.*rateSCost.Cost) +func checkNestedFields(elem string, values []string) bool { + if len(strings.Split(elem, utils.NestingSep)) > 2 { + return true + } + for _, val := range values { + if len(strings.Split(val, utils.NestingSep)) > 2 { + return true + } + } + return false + +} diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 06b90467a..3da7cab61 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -108,6 +108,9 @@ type DataDBDriver interface { type StorDB interface { Storage + SetCDR(*utils.CGREvent, bool) error + GetCDRs(*context.Context, []*Filter, map[string]interface{}) ([]*CDR, error) + RemoveCDRs(*context.Context, []*Filter) error } type LoadStorage interface { diff --git a/engine/storage_internal_stordb.go b/engine/storage_internal_stordb.go new file mode 100644 index 000000000..392bfe69b --- /dev/null +++ b/engine/storage_internal_stordb.go @@ -0,0 +1,264 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "slices" + "strings" + "time" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/utils" +) + +// SetCDR for ManagerDB interface. SetCDR will set a single CDR in internal based on the CGREvent +func (iDB *InternalDB) SetCDR(cgrEv *utils.CGREvent, allowUpdate bool) error { + uniqueID := utils.IfaceAsString(cgrEv.APIOpts[utils.MetaCDRID]) + if !allowUpdate { + if _, has := iDB.db.Get(utils.MetaCDRs, uniqueID); has { + return utils.ErrExists + } + } + idx := make(utils.StringSet) + dp := cgrEv.AsDataProvider() + iDB.indexedFieldsMutex.RLock() + for _, v := range iDB.stringIndexedFields { + val, err := dp.FieldAsString(strings.Split(v, utils.NestingSep)) + if err != nil { + if err == utils.ErrNotFound { + continue + } + return err + } + idx.Add(utils.ConcatenatedKey(v, val)) + } + for _, v := range iDB.prefixIndexedFields { + val, err := dp.FieldAsString(strings.Split(v, utils.NestingSep)) + if err != nil { + if err == utils.ErrNotFound { + continue + } + return err + } + idx.Add(utils.ConcatenatedKey(v, val)) + for i := len(val) - 1; i > 0; i-- { + idx.Add(utils.ConcatenatedKey(v, val[:i])) + } + } + iDB.indexedFieldsMutex.RUnlock() + + iDB.db.Set(utils.MetaCDRs, uniqueID, cgrEv, idx.AsSlice(), true, utils.NonTransactional) + return nil +} + +func (iDB *InternalDB) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts map[string]interface{}) (cdrs []*CDR, err error) { + pairFltrs := make(map[string][]string) + notPairFltrs := make(map[string][]string) + notIndexed := []*FilterRule{} + for _, fltr := range qryFltr { + for _, rule := range fltr.Rules { + var elem string + if !slices.Contains(iDB.stringIndexedFields, strings.TrimPrefix(rule.Element, "~")) || + (rule.Type != utils.MetaString && rule.Type != utils.MetaNotString) { + notIndexed = append(notIndexed, rule) + continue + } + elem = strings.Trim(rule.Element, "~") + switch rule.Type { + case utils.MetaString: + pairFltrs[elem] = rule.Values + case utils.MetaNotString: + notPairFltrs[elem] = rule.Values + } + } + } + // find indexed fields + var cdrMpIDs utils.StringSet + // Apply string filter + for keySlice, fltrSlice := range pairFltrs { + if len(fltrSlice) == 0 { + continue + } + grpMpIDs := make(utils.StringSet) + for _, id := range fltrSlice { + grpMpIDs.AddSlice(iDB.db.GetGroupItemIDs(utils.MetaCDRs, utils.ConcatenatedKey(keySlice, id))) + } + if grpMpIDs.Size() == 0 { + return nil, utils.ErrNotFound + } + if cdrMpIDs == nil { + cdrMpIDs = grpMpIDs + continue + } + cdrMpIDs.Intersect(grpMpIDs) + if cdrMpIDs.Size() == 0 { + return nil, utils.ErrNotFound + } + } + if cdrMpIDs == nil { + cdrMpIDs = utils.NewStringSet(iDB.db.GetItemIDs(utils.MetaCDRs, utils.EmptyString)) + } + // check for Not filters + for keySlice, fltrSlice := range notPairFltrs { + if len(fltrSlice) == 0 { + continue + } + for _, id := range fltrSlice { + for _, id := range iDB.db.GetGroupItemIDs(utils.MetaCDRs, utils.ConcatenatedKey(keySlice, id)) { + cdrMpIDs.Remove(id) + if cdrMpIDs.Size() == 0 { + return nil, utils.ErrNotFound + } + } + } + } + + events := []*utils.CGREvent{} + for key := range cdrMpIDs { + x, ok := iDB.db.Get(utils.MetaCDRs, key) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + cgrEv := x.(*utils.CGREvent) + cgrEvDP := cgrEv.AsDataProvider() + // checking pass for every filter that cannot be indexed + var pass bool = true + for _, fltr := range notIndexed { + if pass, err = fltr.Pass(ctx, cgrEvDP); err != nil { + return nil, err + } else if !pass { + break + } + } + if !pass { + continue + } + events = append(events, cgrEv) + } + if len(events) == 0 { + return nil, utils.ErrNotFound + } + // convert from event into CDRs + cdrs = make([]*CDR, len(events)) + for i, event := range events { + cdrs[i] = &CDR{ + Tenant: event.Tenant, + Opts: event.APIOpts, + Event: event.Event, + CreatedAt: time.Now(), + } + } + var limit, offset, maxItems int + if limit, offset, maxItems, err = utils.GetPaginateOpts(opts); err != nil { + return + } + cdrs, err = utils.Paginate(cdrs, limit, offset, maxItems) + return +} + +func (iDB *InternalDB) RemoveCDRs(ctx *context.Context, qryFltr []*Filter) (err error) { + pairFltrs := make(map[string][]string) + notPairFltrs := make(map[string][]string) + notIndexed := []*FilterRule{} + for _, fltr := range qryFltr { + for _, rule := range fltr.Rules { + var elem string + if !slices.Contains(iDB.stringIndexedFields, strings.TrimPrefix(rule.Element, "~")) || + (rule.Type != utils.MetaString && rule.Type != utils.MetaNotString) { + notIndexed = append(notIndexed, rule) + continue + } + elem = strings.Trim(rule.Element, "~") + switch rule.Type { + case utils.MetaString: + pairFltrs[elem] = rule.Values + case utils.MetaNotString: + notPairFltrs[elem] = rule.Values + } + } + } + // find indexed fields + var cdrMpIDs utils.StringSet + // Apply string filter + for keySlice, fltrSlice := range pairFltrs { + if len(fltrSlice) == 0 { + continue + } + grpMpIDs := make(utils.StringSet) + for _, id := range fltrSlice { + grpMpIDs.AddSlice(iDB.db.GetGroupItemIDs(utils.MetaCDRs, utils.ConcatenatedKey(keySlice, id))) + } + if grpMpIDs.Size() == 0 { + return utils.ErrNotFound + } + if cdrMpIDs == nil { + cdrMpIDs = grpMpIDs + continue + } + cdrMpIDs.Intersect(grpMpIDs) + if cdrMpIDs.Size() == 0 { + return utils.ErrNotFound + } + } + if cdrMpIDs == nil { + cdrMpIDs = utils.NewStringSet(iDB.db.GetItemIDs(utils.MetaCDRs, utils.EmptyString)) + } + // check for Not filters + for keySlice, fltrSlice := range notPairFltrs { + if len(fltrSlice) == 0 { + continue + } + for _, id := range fltrSlice { + for _, id := range iDB.db.GetGroupItemIDs(utils.MetaCDRs, utils.ConcatenatedKey(keySlice, id)) { + cdrMpIDs.Remove(id) + if cdrMpIDs.Size() == 0 { + return utils.ErrNotFound + } + } + } + } + // iterrate trough all CDRs found and select only those who match our filters + for key := range cdrMpIDs { + x, ok := iDB.db.Get(utils.MetaCDRs, key) + if !ok || x == nil { + return utils.ErrNotFound + } + cgrEv := x.(*utils.CGREvent) + cgrEvDP := cgrEv.AsDataProvider() + // checking pass for every filter that cannot be indexed + var pass bool = true + for _, fltr := range notIndexed { + if pass, err = fltr.Pass(ctx, cgrEvDP); err != nil { + return err + } else if !pass { + // the CDR DID NOT passed, so we will remove it + cdrMpIDs.Remove(key) + break + } + } + if !pass { + continue + } + } + // for every CDRs found, we delete matching by counter(key is a uniqueID) + for key := range cdrMpIDs { + iDB.db.Remove(utils.MetaCDRs, key, true, utils.NonTransactional) + } + return +} diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index e885b07e5..2822bb6f6 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1247,6 +1247,35 @@ func (ms *MongoStorage) GetRateProfileRatesDrv(ctx *context.Context, tenant, pro return } +func newAggregateStages(profileID, tenant, prefix string) (match, query bson.D) { + match = bson.D{{ + Key: "$match", Value: bson.M{ + "id": profileID, + "tenant": tenant, + }}, + } + query = bson.D{{ + Key: "$replaceRoot", Value: bson.D{{ + Key: "newRoot", Value: bson.D{{ + Key: "$arrayToObject", Value: bson.D{{ + Key: "$filter", Value: bson.D{ + {Key: "input", Value: bson.M{ + "$objectToArray": "$$ROOT", + }}, + {Key: "cond", Value: bson.D{{ + Key: "$regexFind", Value: bson.M{ + "input": "$$this.k", + "regex": prefix, + }, + }}}, + }, + }}, + }}, + }}, + }} + return +} + func (ms *MongoStorage) SetRateProfileDrv(ctx *context.Context, rpp *utils.RateProfile, optOverwrite bool) (err error) { rpMap, err := rpp.AsDataDBMap(ms.ms) if err != nil { @@ -1554,32 +1583,3 @@ func (ms *MongoStorage) RemoveConfigSectionsDrv(ctx *context.Context, nodeID str } return } - -func newAggregateStages(profileID, tenant, prefix string) (match, query bson.D) { - match = bson.D{{ - Key: "$match", Value: bson.M{ - "id": profileID, - "tenant": tenant, - }}, - } - query = bson.D{{ - Key: "$replaceRoot", Value: bson.D{{ - Key: "newRoot", Value: bson.D{{ - Key: "$arrayToObject", Value: bson.D{{ - Key: "$filter", Value: bson.D{ - {Key: "input", Value: bson.M{ - "$objectToArray": "$$ROOT", - }}, - {Key: "cond", Value: bson.D{{ - Key: "$regexFind", Value: bson.M{ - "input": "$$this.k", - "regex": prefix, - }, - }}}, - }, - }}, - }}, - }}, - }} - return -} diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go index 519222663..24bace86f 100644 --- a/engine/storage_mongo_stordb.go +++ b/engine/storage_mongo_stordb.go @@ -19,6 +19,7 @@ along with this program. If not, see package engine import ( + "fmt" "regexp" "strings" "time" @@ -27,6 +28,7 @@ import ( "github.com/cgrates/cgrates/utils" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/x/bsonx" @@ -289,38 +291,6 @@ func (ms *MongoStorage) SetTPRStats(tps []*utils.TPStatProfile) (err error) { }) } -func (ms *MongoStorage) cleanEmptyFilters(filters bson.M) { - for k, v := range filters { - switch value := v.(type) { - case *int64: - if value == nil { - delete(filters, k) - } - case *float64: - if value == nil { - delete(filters, k) - } - case *time.Time: - if value == nil { - delete(filters, k) - } - case *time.Duration: - if value == nil { - delete(filters, k) - } - case []string: - if len(value) == 0 { - delete(filters, k) - } - case bson.M: - ms.cleanEmptyFilters(value) - if len(value) == 0 { - delete(filters, k) - } - } - } -} - func (ms *MongoStorage) SetTPStats(tpSTs []*utils.TPStatProfile) (err error) { if len(tpSTs) == 0 { return @@ -891,3 +861,190 @@ func (ms *MongoStorage) RemoveVersions(vrs Versions) (err error) { func (ms *MongoStorage) GetStorageType() string { return utils.MetaMongo } + +func (ms *MongoStorage) SetCDR(cdr *utils.CGREvent, allowUpdate bool) error { + if val, has := cdr.Event[utils.OrderID]; has && val == 0 { + cdr.Event[utils.OrderID] = ms.cnter.Next() + } + cdrTable := &CDR{ + Tenant: cdr.Tenant, + Opts: cdr.APIOpts, + Event: cdr.Event, + CreatedAt: time.Now(), + } + return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { + /* + if allowUpdate { + cdrTable.UpdatedAt = time.Now() + _, err = ms.getCol(ColCDRs).UpdateOne(sctx, + //bson.M{"_id": cdrTable.} + //bson.M{CGRIDLow: utils.IfaceAsString(cdr.Event[utils.CGRID])}, + bson.M{"$set": cdrTable}, options.Update().SetUpsert(true)) + return + } + */ + _, err = ms.getCol(ColCDRs).InsertOne(sctx, cdrTable) + if err != nil && strings.Contains(err.Error(), "E11000") { // Mongo returns E11000 when key is duplicated + err = utils.ErrExists + } + return + }) +} + +func (ms *MongoStorage) GetCDRs(_ *context.Context, qryFltr []*Filter, opts map[string]interface{}) (cdrs []*CDR, err error) { + fltrs := make(bson.M) + for _, fltr := range qryFltr { + for _, rule := range fltr.Rules { + if !cdrQueryFilterTypes.Has(rule.Type) { + continue + } + var elem string + if strings.HasPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq) { + elem = "event." + strings.TrimPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq+".") + } else { + elem = "opts." + strings.TrimPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaOpts+".") + } + fltrs[elem] = ms.valueQry(fltrs, elem, rule.Type, rule.Values, strings.HasPrefix(rule.Type, utils.MetaNot)) + } + } + ms.cleanEmptyFilters(fltrs) + + fop := options.Find() + // cop := options.Count() + + limit, offset, maxItems, err := utils.GetPaginateOpts(opts) + if err != nil { + return nil, fmt.Errorf("could not retrieve paginator opts: %w", err) + } + if maxItems < limit+offset { + return nil, fmt.Errorf("sum of limit and offset exceeds maxItems") + } + fop.SetLimit(int64(limit)) + // cop.SetLimit(int64(limit)) + fop.SetSkip(int64(offset)) + // cop.SetSkip(int64(offset)) + + // Execute query + err = ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { + cur, err := ms.getCol(ColCDRs).Find(sctx, fltrs, fop) + if err != nil { + return err + } + for cur.Next(sctx) { + cdr := CDR{} + err := cur.Decode(&cdr) + if err != nil { + return err + } + clone := cdr + cdrs = append(cdrs, &clone) + } + if len(cdrs) == 0 { + return utils.ErrNotFound + } + return cur.Close(sctx) + }) + if err != nil { + return + } + cdrs, err = utils.Paginate(cdrs, 0, 0, int(maxItems)) + return +} + +func (ms *MongoStorage) valueQry(fltrs bson.M, elem, ruleType string, values []string, not bool) (m bson.M) { + msQuery, valChanged := getQueryType(ruleType, not, values) + v, has := fltrs[elem] + if !has { + m = make(bson.M) + fltrs[elem] = m + } else { + m = v.(bson.M) + } + if valChanged != nil { + if val, has := m[msQuery]; has { + m[msQuery] = append(val.([]primitive.Regex), valChanged.([]primitive.Regex)...) + } else { + m[msQuery] = valChanged + } + return + } + if val, has := m[msQuery]; has { + m[msQuery] = append(val.([]string), values...) + } else { + m[msQuery] = values + } + return +} + +func getQueryType(ruleType string, not bool, values []string) (msQuery string, valChanged any) { + switch ruleType { + case utils.MetaString, utils.MetaNotString, utils.MetaEqual, utils.MetaNotEqual: + msQuery = "$in" + if not { + msQuery = "$nin" + } + case utils.MetaLessThan, utils.MetaLessOrEqual, utils.MetaGreaterThan, utils.MetaGreaterOrEqual: + if ruleType == utils.MetaGreaterOrEqual { + msQuery = "$gte" + } else if ruleType == utils.MetaGreaterThan { + msQuery = "$gt" + } else if ruleType == utils.MetaLessOrEqual { + msQuery = "$lte" + } else if ruleType == utils.MetaLessThan { + msQuery = "$lt" + } + case utils.MetaPrefix, utils.MetaNotPrefix, utils.MetaSuffix, utils.MetaNotSuffix: + msQuery = "$in" + if not { + msQuery = "$nin" + } + regex := make([]bsonx.Val, 0, len(values)) + if ruleType == utils.MetaPrefix || ruleType == utils.MetaNotPrefix { + for _, val := range values { + regex = append(regex, bsonx.Regex("/^"+val+"/", utils.EmptyString)) + } + } else { + for _, val := range values { + regex = append(regex, bsonx.Regex("/"+val+"$/", utils.EmptyString)) + } + } + valChanged = regex + } + return +} + +func (ms *MongoStorage) cleanEmptyFilters(filters bson.M) { + for k, v := range filters { + switch value := v.(type) { + case *int64: + if value == nil { + delete(filters, k) + } + case *float64: + if value == nil { + delete(filters, k) + } + case *time.Time: + if value == nil { + delete(filters, k) + } + case *time.Duration: + if value == nil { + delete(filters, k) + } + case []string: + if len(value) == 0 { + delete(filters, k) + } + case bson.M: + ms.cleanEmptyFilters(value) + if len(value) == 0 { + delete(filters, k) + } + } + } +} + +func (ms *MongoStorage) RemoveCDRs(_ *context.Context, qryFltr []*Filter) (err error) { + return utils.ErrNotImplemented +} diff --git a/engine/storage_mysql.go b/engine/storage_mysql.go index cec2f21d7..83fe7ac29 100644 --- a/engine/storage_mysql.go +++ b/engine/storage_mysql.go @@ -106,6 +106,93 @@ func (msqlS *MySQLStorage) notExtraFieldsValueQry(field, value string) string { return fmt.Sprintf(" extra_fields NOT LIKE '%%\"%s\":\"%s\"%%'", field, value) } +// cdrIDQuery will query the CDR by its unique cdrID +func (msqlS *MySQLStorage) cdrIDQuery(cdrID string) string { + return fmt.Sprintf(" JSON_VALUE(opts, '$.\"*cdrID\"') = '%s'", cdrID) +} + +// existField will query for every element on json type if the field exists +func (msqlS *MySQLStorage) existField(elem, field string) string { + return fmt.Sprintf("!JSON_EXISTS(%s, '$.\"%s\"')", elem, field) +} + func (msqlS *MySQLStorage) GetStorageType() string { return utils.MetaMySQL } + +func (msqlS *MySQLStorage) valueQry(ruleType, elem, field string, values []string, not bool) (conditions []string) { + // here are for the filters that their values are empty: *exists, *notexists, *empty, *notempty.. + if len(values) == 0 { + switch ruleType { + case utils.MetaExists, utils.MetaNotExists: + if not { + conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.\"%s\"') IS NULL", elem, field)) + return + } + conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.\"%s\"') IS NOT NULL", elem, field)) + case utils.MetaEmpty, utils.MetaNotEmpty: + if not { + conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.\"%s\"') != ''", elem, field)) + return + } + conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.\"%s\"') != ''", elem, field)) + } + return + } + // here are for the filters that can have more than one value: *string, *prefix, *suffix .. + for _, value := range values { + value := verifyBool(value) // in case we have boolean values, it should be queried over 1 or 0 + var singleCond string + switch ruleType { + case utils.MetaString, utils.MetaNotString, utils.MetaEqual, utils.MetaNotEqual: + if not { + conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.\"%s\"') != '%s'", + elem, field, value)) + continue + } + singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.\"%s\"') = '%s'", elem, field, value) + case utils.MetaLessThan, utils.MetaLessOrEqual, utils.MetaGreaterThan, utils.MetaGreaterOrEqual: + if ruleType == utils.MetaGreaterOrEqual { + singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.\"%s\"') >= %s", elem, field, value) + } else if ruleType == utils.MetaGreaterThan { + singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.\"%s\"') > %s", elem, field, value) + } else if ruleType == utils.MetaLessOrEqual { + singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.\"%s\"') <= %s", elem, field, value) + } else if ruleType == utils.MetaLessThan { + singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.\"%s\"') < %s", elem, field, value) + } + case utils.MetaPrefix, utils.MetaNotPrefix: + if not { + conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.\"%s\"') NOT LIKE '%s%%'", elem, field, value)) + continue + } + singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.\"%s\"') LIKE '%s%%'", elem, field, value) + case utils.MetaSuffix, utils.MetaNotSuffix: + if not { + conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.\"%s\"') NOT LIKE '%%%s'", elem, field, value)) + continue + } + singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.\"%s\"') LIKE '%%%s'", elem, field, value) + case utils.MetaRegex, utils.MetaNotRegex: + if not { + conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.\"%s\"') NOT REGEXP '%s'", elem, field, value)) + continue + } + singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.\"%s\"') REGEXP '%s'", elem, field, value) + } + conditions = append(conditions, singleCond) + } + return +} + +// verifyBool will check the value for booleans in roder to query properly +func verifyBool(value string) string { + switch value { + case "true": + return "1" + case "false": + return "0" + default: + return value + } +} diff --git a/engine/storage_postgres.go b/engine/storage_postgres.go index 7a4662b64..81ea3c66a 100644 --- a/engine/storage_postgres.go +++ b/engine/storage_postgres.go @@ -97,6 +97,79 @@ func (poS *PostgresStorage) notExtraFieldsValueQry(field, value string) string { return fmt.Sprintf(" NOT (extra_fields ?'%s' AND (extra_fields ->> '%s') = '%s')", field, field, value) } +// cdrIDQuery will query the CDR by its unique cdrID +func (poS *PostgresStorage) cdrIDQuery(cdrID string) string { + return fmt.Sprintf(" opts ->> '*cdrID' = '%s'", cdrID) +} + +// existField will query for every element on json type if the field exists +func (poS *PostgresStorage) existField(elem, field string) string { + return fmt.Sprintf("NOT(%s ? '%s')", elem, field) +} + func (poS *PostgresStorage) GetStorageType() string { return utils.MetaPostgres } + +func (poS *PostgresStorage) valueQry(ruleType, elem, field string, values []string, not bool) (conditions []string) { + // here are for the filters that their values are empty: *exists, *notexists, *empty, *notempty.. + if len(values) == 0 { + switch ruleType { + case utils.MetaExists, utils.MetaNotExists: + if not { + conditions = append(conditions, fmt.Sprintf("NOT(%s ? '%s')", elem, field)) + return + } + conditions = append(conditions, fmt.Sprintf("%s ? '%s'", elem, field)) + case utils.MetaEmpty, utils.MetaNotEmpty: + if not { + conditions = append(conditions, fmt.Sprintf(" NOT (%s ->> '%s') = ''", elem, field)) + return + } + conditions = append(conditions, fmt.Sprintf(" (%s ->> '%s') = ''", elem, field)) + } + return + } + // here are for the filters that can have more than one value: *string, *prefix, *suffix .. + for _, value := range values { + var singleCond string + switch ruleType { + case utils.MetaString, utils.MetaNotString, utils.MetaEqual, utils.MetaNotEqual: + if not { + conditions = append(conditions, fmt.Sprintf(" NOT (%s ?'%s' AND (%s ->> '%s') = '%s')", elem, field, elem, field, value)) + continue + } + singleCond = fmt.Sprintf(" (%s ->> '%s') = '%s'", elem, field, value) + case utils.MetaLessThan, utils.MetaLessOrEqual, utils.MetaGreaterThan, utils.MetaGreaterOrEqual: + if ruleType == utils.MetaGreaterOrEqual { + singleCond = fmt.Sprintf(" (%s ->> '%s')::numeric >= '%s'", elem, field, value) + } else if ruleType == utils.MetaGreaterThan { + singleCond = fmt.Sprintf(" (%s ->> '%s')::numeric > '%s'", elem, field, value) + } else if ruleType == utils.MetaLessOrEqual { + singleCond = fmt.Sprintf(" (%s ->> '%s')::numeric <= '%s'", elem, field, value) + } else if ruleType == utils.MetaLessThan { + singleCond = fmt.Sprintf(" (%s ->> '%s')::numeric < '%s'", elem, field, value) + } + case utils.MetaPrefix, utils.MetaNotPrefix: + if not { + conditions = append(conditions, fmt.Sprintf(" NOT ((%s ->> '%s') ILIKE '%s%%')", elem, field, value)) + continue + } + singleCond = fmt.Sprintf(" (%s ->> '%s') ILIKE '%s%%'", elem, field, value) + case utils.MetaSuffix, utils.MetaNotSuffix: + if not { + conditions = append(conditions, fmt.Sprintf(" NOT ((%s ->> '%s') ILIKE '%%%s')", elem, field, value)) + continue + } + singleCond = fmt.Sprintf(" (%s ->> '%s') ILIKE '%%%s'", elem, field, value) + case utils.MetaRegex, utils.MetaNotRegex: + if not { + conditions = append(conditions, fmt.Sprintf(" (%s ->> '%s') !~ '%s'", elem, field, value)) + continue + } + singleCond = fmt.Sprintf(" (%s ->> '%s') ~ '%s'", elem, field, value) + } + conditions = append(conditions, singleCond) + } + return +} diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 62170503d..657bdb283 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -24,6 +24,7 @@ import ( "os" "path" "strings" + "time" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/utils" @@ -35,6 +36,9 @@ type SQLImpl interface { extraFieldsValueQry(string, string) string notExtraFieldsExistsQry(string) string notExtraFieldsValueQry(string, string) string + valueQry(string, string, string, []string, bool) []string // will query for every type of filtering in case of needed + cdrIDQuery(string) string // will get the unique *cdrID for every CDR + existField(string, string) string // will query for every element on json type if the field exists } type SQLStorage struct { @@ -92,657 +96,14 @@ func (sqls *SQLStorage) CreateTablesFromScript(scriptPath string) error { } func (sqls *SQLStorage) IsDBEmpty() (resp bool, err error) { - tbls := []string{ - utils.TBLTPResources, utils.TBLTPStats, utils.TBLTPThresholds, - utils.TBLTPFilters, utils.SessionCostsTBL, utils.CDRsTBL, - utils.TBLVersions, utils.TBLTPRoutes, utils.TBLTPAttributes, utils.TBLTPChargers, - utils.TBLTPDispatchers, utils.TBLTPDispatcherHosts, - } - for _, tbl := range tbls { + for _, tbl := range []string{utils.CDRsTBL, utils.TBLVersions} { if sqls.db.Migrator().HasTable(tbl) { return false, nil } - } return true, nil } -// update -// Return a list with all TPids defined in the system, even if incomplete, isolated in some table. -func (sqls *SQLStorage) GetTpIds(colName string) ([]string, error) { - var rows *sql.Rows - var err error - var qryStr string - if colName == "" { - for _, clNm := range []string{ - utils.TBLTPResources, - utils.TBLTPStats, - utils.TBLTPThresholds, - utils.TBLTPFilters, - utils.TBLTPRoutes, - utils.TBLTPAttributes, - utils.TBLTPChargers, - utils.TBLTPDispatchers, - utils.TBLTPDispatcherHosts, - } { - qryStr += fmt.Sprintf("UNION (SELECT tpid FROM %s)", clNm) - } - qryStr = strings.TrimPrefix(qryStr, "UNION ") - } else { - qryStr = fmt.Sprintf("(SELECT tpid FROM %s)", colName) - } - rows, err = sqls.DB.Query(qryStr) - if err != nil { - return nil, err - } - defer rows.Close() - ids := make([]string, 0) - i := 0 - for rows.Next() { - i++ //Keep here a reference so we know we got at least one - var id string - err = rows.Scan(&id) - if err != nil { - return nil, err - } - ids = append(ids, id) - } - if i == 0 { - return nil, nil - } - return ids, nil -} - -// ToDo: TEST -func (sqls *SQLStorage) GetTpTableIds(tpid, table string, distinct []string, - filters map[string]string, pagination *utils.PaginatorWithSearch) ([]string, error) { - qry := fmt.Sprintf("SELECT DISTINCT %s FROM %s where tpid='%s'", strings.Join(distinct, utils.FieldsSep), table, tpid) - for key, value := range filters { - if key != "" && value != "" { - qry += fmt.Sprintf(" AND %s='%s'", key, value) - } - } - if pagination != nil { - if len(pagination.Search) != 0 { - qry += fmt.Sprintf(" AND (%s LIKE '%%%s%%'", distinct[0], pagination.Search) - for _, d := range distinct[1:] { - qry += fmt.Sprintf(" OR %s LIKE '%%%s%%'", d, pagination.Search) - } - qry += ")" - } - if pagination.Paginator != nil { - if pagination.Limit != nil { // Keep Postgres compatibility by adding offset only when limit defined - qry += fmt.Sprintf(" LIMIT %d", *pagination.Limit) - if pagination.Offset != nil { - qry += fmt.Sprintf(" OFFSET %d", *pagination.Offset) - } - } - } - } - rows, err := sqls.DB.Query(qry) - if err != nil { - return nil, err - } - - defer rows.Close() - ids := []string{} - i := 0 - for rows.Next() { - i++ //Keep here a reference so we know we got at least one - - cols, err := rows.Columns() // Get the column names; remember to check err - if err != nil { - return nil, err - } - vals := make([]string, len(cols)) // Allocate enough values - ints := make([]any, len(cols)) // Make a slice of []any - for i := range ints { - ints[i] = &vals[i] // Copy references into the slice - } - - err = rows.Scan(ints...) - if err != nil { - return nil, err - } - finalID := vals[0] - if len(vals) > 1 { - finalID = strings.Join(vals, utils.ConcatenatedKeySep) - } - ids = append(ids, finalID) - } - if i == 0 { - return nil, nil - } - return ids, nil -} - -func (sqls *SQLStorage) RemTpData(table, tpid string, args map[string]string) error { - tx := sqls.db.Begin() - - if len(table) == 0 { // Remove tpid out of all tables - for _, tblName := range []string{ - utils.TBLTPResources, utils.TBLTPStats, utils.TBLTPThresholds, - utils.TBLTPFilters, utils.TBLTPRoutes, utils.TBLTPAttributes, - utils.TBLTPChargers, utils.TBLTPDispatchers, utils.TBLTPDispatcherHosts, utils.TBLTPAccounts, - utils.TBLTPActionProfiles, utils.TBLTPRateProfiles} { - if err := tx.Table(tblName).Where("tpid = ?", tpid).Delete(nil).Error; err != nil { - tx.Rollback() - return err - } - } - tx.Commit() - return nil - } - // Remove from a single table - tx = tx.Table(table).Where("tpid = ?", tpid) - // Compose filters - for key, value := range args { - tx = tx.Where(key+" = ?", value) - } - if err := tx.Delete(nil).Error; err != nil { - tx.Rollback() - return err - } - tx.Commit() - return nil -} - -func (sqls *SQLStorage) SetTPResources(rls []*utils.TPResourceProfile) error { - if len(rls) == 0 { - return nil - } - tx := sqls.db.Begin() - for _, rl := range rls { - // Remove previous - if err := tx.Where(&ResourceMdl{Tpid: rl.TPid, ID: rl.ID}).Delete(ResourceMdl{}).Error; err != nil { - tx.Rollback() - return err - } - for _, mrl := range APItoModelResource(rl) { - if err := tx.Create(&mrl).Error; err != nil { - tx.Rollback() - return err - } - } - } - tx.Commit() - return nil -} - -func (sqls *SQLStorage) SetTPStats(sts []*utils.TPStatProfile) error { - if len(sts) == 0 { - return nil - } - tx := sqls.db.Begin() - for _, stq := range sts { - // Remove previous - if err := tx.Where(&StatMdl{Tpid: stq.TPid, ID: stq.ID}).Delete(StatMdl{}).Error; err != nil { - tx.Rollback() - return err - } - for _, mst := range APItoModelStats(stq) { - if err := tx.Create(&mst).Error; err != nil { - tx.Rollback() - return err - } - } - } - tx.Commit() - return nil -} - -func (sqls *SQLStorage) SetTPThresholds(ths []*utils.TPThresholdProfile) error { - if len(ths) == 0 { - return nil - } - tx := sqls.db.Begin() - for _, th := range ths { - // Remove previous - if err := tx.Where(&ThresholdMdl{Tpid: th.TPid, ID: th.ID}).Delete(ThresholdMdl{}).Error; err != nil { - tx.Rollback() - return err - } - for _, mst := range APItoModelTPThreshold(th) { - if err := tx.Create(&mst).Error; err != nil { - tx.Rollback() - return err - } - } - } - tx.Commit() - return nil -} - -func (sqls *SQLStorage) SetTPFilters(ths []*utils.TPFilterProfile) error { - if len(ths) == 0 { - return nil - } - tx := sqls.db.Begin() - for _, th := range ths { - // Remove previous - if err := tx.Where(&FilterMdl{Tpid: th.TPid, ID: th.ID}).Delete(FilterMdl{}).Error; err != nil { - tx.Rollback() - return err - } - for _, mst := range APItoModelTPFilter(th) { - if err := tx.Create(&mst).Error; err != nil { - tx.Rollback() - return err - } - } - } - tx.Commit() - return nil -} - -func (sqls *SQLStorage) SetTPRoutes(tpRoutes []*utils.TPRouteProfile) error { - if len(tpRoutes) == 0 { - return nil - } - tx := sqls.db.Begin() - for _, tpRoute := range tpRoutes { - // Remove previous - if err := tx.Where(&RouteMdl{Tpid: tpRoute.TPid, ID: tpRoute.ID}).Delete(RouteMdl{}).Error; err != nil { - tx.Rollback() - return err - } - for _, mst := range APItoModelTPRoutes(tpRoute) { - if err := tx.Create(&mst).Error; err != nil { - tx.Rollback() - return err - } - } - } - tx.Commit() - return nil -} - -func (sqls *SQLStorage) SetTPAttributes(tpAttrs []*utils.TPAttributeProfile) error { - if len(tpAttrs) == 0 { - return nil - } - tx := sqls.db.Begin() - for _, stq := range tpAttrs { - // Remove previous - if err := tx.Where(&AttributeMdl{Tpid: stq.TPid, ID: stq.ID}).Delete(AttributeMdl{}).Error; err != nil { - tx.Rollback() - return err - } - for _, mst := range APItoModelTPAttribute(stq) { - if err := tx.Create(&mst).Error; err != nil { - tx.Rollback() - return err - } - } - } - tx.Commit() - return nil -} - -func (sqls *SQLStorage) SetTPChargers(tpCPPs []*utils.TPChargerProfile) error { - if len(tpCPPs) == 0 { - return nil - } - tx := sqls.db.Begin() - for _, cpp := range tpCPPs { - // Remove previous - if err := tx.Where(&ChargerMdl{Tpid: cpp.TPid, ID: cpp.ID}).Delete(ChargerMdl{}).Error; err != nil { - tx.Rollback() - return err - } - for _, mst := range APItoModelTPCharger(cpp) { - if err := tx.Create(&mst).Error; err != nil { - tx.Rollback() - return err - } - } - } - tx.Commit() - return nil -} - -func (sqls *SQLStorage) SetTPDispatcherProfiles(tpDPPs []*utils.TPDispatcherProfile) error { - if len(tpDPPs) == 0 { - return nil - } - tx := sqls.db.Begin() - for _, dpp := range tpDPPs { - // Remove previous - if err := tx.Where(&DispatcherProfileMdl{Tpid: dpp.TPid, ID: dpp.ID}).Delete(DispatcherProfileMdl{}).Error; err != nil { - tx.Rollback() - return err - } - for _, mst := range APItoModelTPDispatcherProfile(dpp) { - if err := tx.Create(&mst).Error; err != nil { - tx.Rollback() - return err - } - } - } - tx.Commit() - return nil -} - -func (sqls *SQLStorage) SetTPDispatcherHosts(tpDPPs []*utils.TPDispatcherHost) error { - if len(tpDPPs) == 0 { - return nil - } - tx := sqls.db.Begin() - for _, dpp := range tpDPPs { - // Remove previous - if err := tx.Where(&DispatcherHostMdl{Tpid: dpp.TPid, ID: dpp.ID}).Delete(DispatcherHostMdl{}).Error; err != nil { - tx.Rollback() - return err - } - if err := tx.Create(APItoModelTPDispatcherHost(dpp)).Error; err != nil { - tx.Rollback() - return err - } - } - tx.Commit() - return nil -} - -func (sqls *SQLStorage) SetTPRateProfiles(tpDPPs []*utils.TPRateProfile) error { - if len(tpDPPs) == 0 { - return nil - } - tx := sqls.db.Begin() - for _, dpp := range tpDPPs { - // Remove previous - if err := tx.Where(&RateProfileMdl{Tpid: dpp.TPid, ID: dpp.ID}).Delete(RateProfileMdl{}).Error; err != nil { - tx.Rollback() - return err - } - for _, mst := range APItoModelTPRateProfile(dpp) { - if err := tx.Create(&mst).Error; err != nil { - tx.Rollback() - return err - } - } - } - tx.Commit() - return nil -} - -func (sqls *SQLStorage) SetTPActionProfiles(tpAps []*utils.TPActionProfile) error { - if len(tpAps) == 0 { - return nil - } - tx := sqls.db.Begin() - for _, tpAp := range tpAps { - // Remove previous - if err := tx.Where(&ActionProfileMdl{Tpid: tpAp.TPid, Tenant: tpAp.Tenant, ID: tpAp.ID}).Delete(ActionProfileMdl{}).Error; err != nil { - tx.Rollback() - return err - } - for _, mst := range APItoModelTPActionProfile(tpAp) { - if err := tx.Create(&mst).Error; err != nil { - tx.Rollback() - return err - } - } - } - tx.Commit() - return nil -} - -func (sqls *SQLStorage) SetTPAccounts(tpAps []*utils.TPAccount) error { - if len(tpAps) == 0 { - return nil - } - tx := sqls.db.Begin() - for _, tpAp := range tpAps { - // Remove previous - if err := tx.Where(&AccountMdl{Tpid: tpAp.TPid, Tenant: tpAp.Tenant, ID: tpAp.ID}).Delete(AccountMdl{}).Error; err != nil { - tx.Rollback() - return err - } - for _, mst := range APItoModelTPAccount(tpAp) { - if err := tx.Create(&mst).Error; err != nil { - tx.Rollback() - return err - } - } - } - tx.Commit() - return nil -} - -func (sqls *SQLStorage) GetTPResources(tpid, tenant, id string) ([]*utils.TPResourceProfile, error) { - var rls ResourceMdls - q := sqls.db.Where("tpid = ?", tpid) - if len(id) != 0 { - q = q.Where("id = ?", id) - } - if len(tenant) != 0 { - q = q.Where("tenant = ?", tenant) - } - if err := q.Find(&rls).Error; err != nil { - return nil, err - } - arls := rls.AsTPResources() - if len(arls) == 0 { - return arls, utils.ErrNotFound - } - return arls, nil -} - -func (sqls *SQLStorage) GetTPStats(tpid, tenant, id string) ([]*utils.TPStatProfile, error) { - var sts StatMdls - q := sqls.db.Where("tpid = ?", tpid) - if len(id) != 0 { - q = q.Where("id = ?", id) - } - if len(tenant) != 0 { - q = q.Where("tenant = ?", tenant) - } - if err := q.Find(&sts).Error; err != nil { - return nil, err - } - asts := sts.AsTPStats() - if len(asts) == 0 { - return asts, utils.ErrNotFound - } - return asts, nil -} - -func (sqls *SQLStorage) GetTPThresholds(tpid, tenant, id string) ([]*utils.TPThresholdProfile, error) { - var ths ThresholdMdls - q := sqls.db.Where("tpid = ?", tpid) - if len(id) != 0 { - q = q.Where("id = ?", id) - } - if len(tenant) != 0 { - q = q.Where("tenant = ?", tenant) - } - if err := q.Find(&ths).Error; err != nil { - return nil, err - } - aths := ths.AsTPThreshold() - if len(aths) == 0 { - return aths, utils.ErrNotFound - } - return aths, nil -} - -func (sqls *SQLStorage) GetTPFilters(tpid, tenant, id string) ([]*utils.TPFilterProfile, error) { - var ths FilterMdls - q := sqls.db.Where("tpid = ?", tpid) - if len(id) != 0 { - q = q.Where("id = ?", id) - } - if len(tenant) != 0 { - q = q.Where("tenant = ?", tenant) - } - if err := q.Find(&ths).Error; err != nil { - return nil, err - } - aths := ths.AsTPFilter() - if len(aths) == 0 { - return aths, utils.ErrNotFound - } - return aths, nil -} - -func (sqls *SQLStorage) GetTPRoutes(tpid, tenant, id string) ([]*utils.TPRouteProfile, error) { - var tpRoutes RouteMdls - q := sqls.db.Where("tpid = ?", tpid) - if len(id) != 0 { - q = q.Where("id = ?", id) - } - if len(tenant) != 0 { - q = q.Where("tenant = ?", tenant) - } - if err := q.Find(&tpRoutes).Error; err != nil { - return nil, err - } - aTpRoutes := tpRoutes.AsTPRouteProfile() - if len(aTpRoutes) == 0 { - return aTpRoutes, utils.ErrNotFound - } - return aTpRoutes, nil -} - -func (sqls *SQLStorage) GetTPAttributes(tpid, tenant, id string) ([]*utils.TPAttributeProfile, error) { - var sps AttributeMdls - q := sqls.db.Where("tpid = ?", tpid) - if len(id) != 0 { - q = q.Where("id = ?", id) - } - if len(tenant) != 0 { - q = q.Where("tenant = ?", tenant) - } - if err := q.Find(&sps).Error; err != nil { - return nil, err - } - arls := sps.AsTPAttributes() - if len(arls) == 0 { - return arls, utils.ErrNotFound - } - return arls, nil -} - -func (sqls *SQLStorage) GetTPChargers(tpid, tenant, id string) ([]*utils.TPChargerProfile, error) { - var cpps ChargerMdls - q := sqls.db.Where("tpid = ?", tpid) - if len(id) != 0 { - q = q.Where("id = ?", id) - } - if len(tenant) != 0 { - q = q.Where("tenant = ?", tenant) - } - if err := q.Find(&cpps).Error; err != nil { - return nil, err - } - arls := cpps.AsTPChargers() - if len(arls) == 0 { - return arls, utils.ErrNotFound - } - return arls, nil -} - -func (sqls *SQLStorage) GetTPDispatcherProfiles(tpid, tenant, id string) ([]*utils.TPDispatcherProfile, error) { - var dpps DispatcherProfileMdls - q := sqls.db.Where("tpid = ?", tpid) - if len(id) != 0 { - q = q.Where("id = ?", id) - } - if len(tenant) != 0 { - q = q.Where("tenant = ?", tenant) - } - if err := q.Find(&dpps).Error; err != nil { - return nil, err - } - arls := dpps.AsTPDispatcherProfiles() - if len(arls) == 0 { - return arls, utils.ErrNotFound - } - return arls, nil -} - -func (sqls *SQLStorage) GetTPDispatcherHosts(tpid, tenant, id string) ([]*utils.TPDispatcherHost, error) { - var dpps DispatcherHostMdls - q := sqls.db.Where("tpid = ?", tpid) - if len(id) != 0 { - q = q.Where("id = ?", id) - } - if len(tenant) != 0 { - q = q.Where("tenant = ?", tenant) - } - if err := q.Find(&dpps).Error; err != nil { - return nil, err - } - arls, err := dpps.AsTPDispatcherHosts() - if err != nil { - return nil, err - } - if len(arls) == 0 { - return arls, utils.ErrNotFound - } - return arls, nil -} - -func (sqls *SQLStorage) GetTPRateProfiles(tpid, tenant, id string) ([]*utils.TPRateProfile, error) { - var dpps RateProfileMdls - q := sqls.db.Where("tpid = ?", tpid) - if len(id) != 0 { - q = q.Where("id = ?", id) - } - if len(tenant) != 0 { - q = q.Where("tenant = ?", tenant) - } - if err := q.Find(&dpps).Error; err != nil { - return nil, err - } - arls := dpps.AsTPRateProfile() - if len(arls) == 0 { - return arls, utils.ErrNotFound - } - return arls, nil -} - -func (sqls *SQLStorage) GetTPActionProfiles(tpid, tenant, id string) ([]*utils.TPActionProfile, error) { - var dpps ActionProfileMdls - q := sqls.db.Where("tpid = ?", tpid) - - if len(id) != 0 { - q = q.Where("id = ?", id) - } - if len(tenant) != 0 { - q = q.Where("tenant = ?", tenant) - } - if err := q.Find(&dpps).Error; err != nil { - return nil, err - } - arls := dpps.AsTPActionProfile() - if len(arls) == 0 { - return arls, utils.ErrNotFound - } - return arls, nil -} - -func (sqls *SQLStorage) GetTPAccounts(tpid, tenant, id string) ([]*utils.TPAccount, error) { - var dpps AccountMdls - q := sqls.db.Where("tpid = ?", tpid) - if len(id) != 0 { - q = q.Where("id = ?", id) - } - if len(tenant) != 0 { - q = q.Where("tenant = ?", tenant) - } - if err := q.Find(&dpps).Error; err != nil { - return nil, err - } - arls, err := dpps.AsTPAccount() - if err != nil { - return nil, err - } else if len(arls) == 0 { - return arls, utils.ErrNotFound - } - return arls, nil -} - // GetVersions returns slice of all versions or a specific version if tag is specified func (sqls *SQLStorage) GetVersions(itm string) (vrs Versions, err error) { q := sqls.db.Model(&TBLVersion{}) @@ -779,3 +140,217 @@ func (sqls *SQLStorage) RemoveVersions(vrs Versions) (err error) { tx.Commit() return } + +func (sqls *SQLStorage) SetCDR(cdr *utils.CGREvent, allowUpdate bool) error { + tx := sqls.db.Begin() + if tx.Error != nil { + return tx.Error + } + cdrTable := &CDRSQLTable{ + Tenant: cdr.Tenant, + Opts: cdr.APIOpts, + Event: cdr.Event, + CreatedAt: time.Now(), + } + saved := tx.Save(cdrTable) + if saved.Error != nil { + tx.Rollback() + if !allowUpdate { + if strings.Contains(saved.Error.Error(), "1062") || strings.Contains(saved.Error.Error(), "duplicate key") { // returns 1062/pq when key is duplicated + return utils.ErrExists + } + return saved.Error + } + tx = sqls.db.Begin() + if tx.Error != nil { + return tx.Error + } + + updated := tx.Model(&CDRSQLTable{}).Where( + sqls.cdrIDQuery(utils.IfaceAsString(cdr.APIOpts[utils.MetaCDRID]))).Updates( + CDRSQLTable{Opts: cdr.APIOpts, Event: cdr.Event, UpdatedAt: time.Now()}) + if updated.Error != nil { + tx.Rollback() + return updated.Error + } + } + tx.Commit() + return nil +} + +// GetCDRs has ability to get the filtered CDRs, count them or simply return them +// qryFltr.Unscoped will ignore soft deletes or delete records permanently +func (sqls *SQLStorage) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts map[string]interface{}) (cdrs []*CDR, err error) { + q := sqls.db.Table(utils.CDRsTBL) + var excludedCdrQueryFilterTypes []*FilterRule + for _, fltr := range qryFltr { + for _, rule := range fltr.Rules { + if !cdrQueryFilterTypes.Has(rule.Type) || checkNestedFields(rule.Element, rule.Values) { + excludedCdrQueryFilterTypes = append(excludedCdrQueryFilterTypes, rule) + continue + } + var elem, field string + switch { + case strings.HasPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep): + elem = "event" + field = strings.TrimPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep) + case strings.HasPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaOpts+utils.NestingSep): + elem = "opts" + field = strings.TrimPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaOpts+utils.NestingSep) + } + var count int64 + if _ = sqls.db.Table(utils.CDRsTBL).Where( + sqls.existField(elem, field)).Count(&count); count > 0 && + (rule.Type == utils.MetaNotExists || + rule.Type == utils.MetaNotString) { + continue + } + conditions := sqls.valueQry(rule.Type, elem, field, rule.Values, strings.HasPrefix(rule.Type, utils.MetaNot)) + q.Where(strings.Join(conditions, " OR ")) + } + } + + limit, offset, maxItems, err := utils.GetPaginateOpts(opts) + if err != nil { + return nil, fmt.Errorf("could not retrieve paginator opts: %w", err) + } + if maxItems < limit+offset { + return nil, fmt.Errorf("sum of limit and offset exceeds maxItems") + } + q = q.Limit(limit) + q = q.Offset(offset) + + // Execute query + results := make([]*CDRSQLTable, 0) + if err = q.Find(&results).Error; err != nil { + return + } + if len(results) == 0 { + return nil, utils.ErrNotFound + } + //convert into CDR + resultCdr := make([]*CDR, 0, len(results)) + for _, val := range results { + // here we wil do our filtration, meaning that we will filter those cdrs who cannot be filtered in the databes eg: *ai, *rsr.. + if len(excludedCdrQueryFilterTypes) != 0 { + newCdr := &CDR{ + Tenant: val.Tenant, + Opts: val.Opts, + Event: val.Event, + } + var pass bool + dP := NewCGREventFromCDR(newCdr).AsDataProvider() + for _, fltr := range excludedCdrQueryFilterTypes { + if pass, err = fltr.Pass(ctx, dP); err != nil { + return nil, err + } else if !pass { + break + } + } + // if the cdr passed the filtration, get it as result, else continue + if !pass { + continue + } + } + resultCdr = append(resultCdr, &CDR{ + Tenant: val.Tenant, + Opts: val.Opts, + Event: val.Event, + CreatedAt: val.CreatedAt, + UpdatedAt: val.UpdatedAt, + DeletedAt: val.DeletedAt, + }) + } + if len(resultCdr) == 0 { + return nil, utils.ErrNotFound + } + if maxItems != 0 && len(resultCdr) > maxItems { + return nil, fmt.Errorf("maximum number of items exceeded") + } + cdrs, err = utils.Paginate(resultCdr, 0, 0, maxItems) + return +} + +func (sqls *SQLStorage) RemoveCDRs(ctx *context.Context, qryFltr []*Filter) (err error) { + q := sqls.db.Table(utils.CDRsTBL) + var excludedCdrQueryFilterTypes []*FilterRule + for _, fltr := range qryFltr { + for _, rule := range fltr.Rules { + if !cdrQueryFilterTypes.Has(rule.Type) || checkNestedFields(rule.Element, rule.Values) { + excludedCdrQueryFilterTypes = append(excludedCdrQueryFilterTypes, rule) + continue + } + var elem, field string + switch { + case strings.HasPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep): + elem = "event" + field = strings.TrimPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep) + case strings.HasPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaOpts+utils.NestingSep): + elem = "opts" + field = strings.TrimPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaOpts+utils.NestingSep) + } + var count int64 + if _ = sqls.db.Table(utils.CDRsTBL).Where( + sqls.existField(elem, field)).Count(&count); count > 0 && + (rule.Type == utils.MetaNotExists || + rule.Type == utils.MetaNotString) { + continue + } + conditions := sqls.valueQry(rule.Type, elem, field, rule.Values, strings.HasPrefix(rule.Type, utils.MetaNot)) + q.Where(strings.Join(conditions, " OR ")) + } + } + // if we do not have any filters that cannot be queried in database, just delete all the results (e.g. *rsr, *ai, *cronexp ..)) + if len(excludedCdrQueryFilterTypes) == 0 { + if err = q.Delete(nil).Error; err != nil { + q.Rollback() + return err + } + return + } + // in the other case, if we have such filters, check the results based on those filters + results := make([]*CDRSQLTable, 0) + if err = q.Find(&results).Error; err != nil { + return + } + // this means nothing in database matched, so we will not check the filtration process + if len(results) == 0 { + return + } + // keep the result for quering with other filter type that are not allowed in database + q = sqls.db.Table(utils.CDRsTBL) // reset the query + remCdr := make([]string, 0, len(results)) // we will keep the *cdrID of every CDR taht matched the those filters + for _, cdr := range results { + if len(excludedCdrQueryFilterTypes) != 0 { + newCdr := &CDR{ + Tenant: cdr.Tenant, + Opts: cdr.Opts, + Event: cdr.Event, + } + var pass bool + dP := NewCGREventFromCDR(newCdr).AsDataProvider() + // check if the filter pass + for _, fltr := range excludedCdrQueryFilterTypes { + if pass, err = fltr.Pass(ctx, dP); err != nil { + return err + } else if !pass { + break + } + } + if pass { + // if the filters passed, remove the CDR by it's *cdrID + remCdr = append(remCdr, sqls.cdrIDQuery(utils.IfaceAsString(newCdr.Opts[utils.MetaCDRID]))) + } + } + } + // this means nothing PASSED trough filtration process, so nothing will be deleted + if len(remCdr) == 0 { + return + } + q.Where(strings.Join(remCdr, " OR ")) + if err = q.Delete(nil).Error; err != nil { + q.Rollback() + return err + } + return +} diff --git a/utils/consts.go b/utils/consts.go index 860dae476..28e632a6e 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -167,6 +167,7 @@ const ( GOBCaps = "GOB" MsgPack = "msgpack" CSVLoad = "CSVLOAD" + MetaCDRID = "*cdrID" MetaOriginID = "*originID" ToR = "ToR" OrderID = "OrderID"