From fa591c50a7e668bfbf63d830f7c272735fedc450 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 21 Mar 2019 12:36:36 +0200 Subject: [PATCH] Added *remove_session_costs action. fixes#1423 --- engine/action.go | 27 ++++++++ engine/storage_interface.go | 1 + engine/storage_map_stordb.go | 3 + engine/storage_mongo_datadb.go | 1 + engine/storage_mongo_stordb.go | 17 +++++ engine/storage_sql.go | 60 ++++++++++++++++ utils/apitpdata.go | 121 +++++++++++++++++++++++++++++++++ 7 files changed, 230 insertions(+) diff --git a/engine/action.go b/engine/action.go index c862cb610..af4b0300e 100644 --- a/engine/action.go +++ b/engine/action.go @@ -84,6 +84,7 @@ const ( SetExpiry = "*set_expiry" MetaPublishAccount = "*publish_account" MetaPublishBalance = "*publish_balance" + MetaRemoveSessionCosts = "*remove_session_costs" ) func (a *Action) Clone() *Action { @@ -127,6 +128,7 @@ func getActionFunc(typ string) (actionTypeFunc, bool) { utils.MetaAMQPjsonMap: sendAMQP, utils.MetaAWSjsonMap: sendAWS, utils.MetaSQSjsonMap: sendSQS, + MetaRemoveSessionCosts: cleanSessionCosts, } f, exists := actionFuncMap[typ] return f, exists @@ -960,3 +962,28 @@ func (cdrP *cdrLogProvider) AsNavigableMap([]*config.FCTemplate) ( func (cdrP *cdrLogProvider) RemoteHost() net.Addr { return utils.LocalAddr() } + +func cleanSessionCosts(_ *Account, _ *Action, _ Actions, extraData interface{}) error { // FiltersID;inlineFilter + fltrs, err := utils.IfaceAsString(extraData) + if err != nil { + return err + } + tenant := config.CgrConfig().GeneralCfg().DefaultTenant + smcFilter := new(utils.SMCostFilter) + smcFilter.Usage = make([]*time.Duration, 2) + for _, fltrID := range strings.Split(fltrs, utils.INFIELD_SEP) { + fltr, err := dm.GetFilter(tenant, fltrID, true, true, utils.NonTransactional) + if err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Error: %s for filter: %s in action: <%s>", + utils.Actions, err.Error(), fltrID, MetaRemoveSessionCosts)) + continue + } + for _, rule := range fltr.Rules { + smcFilter, err = utils.AppendToSMCostFilter(smcFilter, rule.Type, rule.FieldName, rule.Values, config.CgrConfig().GeneralCfg().DefaultTimezone) + if err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> %s in action: <%s>", utils.Actions, err.Error(), MetaRemoveSessionCosts)) + } + } + } + return cdrStorage.RemoveSMCosts(smcFilter) +} diff --git a/engine/storage_interface.go b/engine/storage_interface.go index cab8665e3..82b5bf3e4 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -142,6 +142,7 @@ type CdrStorage interface { SetSMCost(smc *SMCost) error GetSMCosts(cgrid, runid, originHost, originIDPrfx string) ([]*SMCost, error) RemoveSMCost(*SMCost) error + RemoveSMCosts(qryFltr *utils.SMCostFilter) error GetCDRs(*utils.CDRsFilter, bool) ([]*CDR, int64, error) } diff --git a/engine/storage_map_stordb.go b/engine/storage_map_stordb.go index 1e07fa14b..eeaf6d4d7 100755 --- a/engine/storage_map_stordb.go +++ b/engine/storage_map_stordb.go @@ -158,6 +158,9 @@ func (ms *MapStorage) SetCDR(cdr *CDR, allowUpdate bool) (err error) { func (ms *MapStorage) RemoveSMCost(smc *SMCost) (err error) { return utils.ErrNotImplemented } +func (ms *MapStorage) RemoveSMCosts(qryFltr *utils.SMCostFilter) error { + return utils.ErrNotImplemented +} func (ms *MapStorage) GetCDRs(filter *utils.CDRsFilter, remove bool) (cdrs []*CDR, count int64, err error) { return nil, 0, utils.ErrNotImplemented } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 327dc49a2..e863a2e97 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -99,6 +99,7 @@ var ( CostDetailsLow = strings.ToLower(utils.CostDetails) DestinationLow = strings.ToLower(utils.Destination) CostLow = strings.ToLower(utils.COST) + CostSourceLow = strings.ToLower(utils.CostSource) tTime = reflect.TypeOf(time.Time{}) ) diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go index a5b945476..bb33914b5 100644 --- a/engine/storage_mongo_stordb.go +++ b/engine/storage_mongo_stordb.go @@ -912,6 +912,23 @@ func (ms *MongoStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix stri return smcs, err } +func (ms *MongoStorage) RemoveSMCosts(qryFltr *utils.SMCostFilter) error { + filters := bson.M{ + CGRIDLow: bson.M{"$in": qryFltr.CgrIDs, "$nin": qryFltr.NotCgrIDs}, + RunIDLow: bson.M{"$in": qryFltr.RunIDs, "$nin": qryFltr.NotRunIDs}, + OriginHostLow: bson.M{"$in": qryFltr.OriginHosts, "$nin": qryFltr.NotOriginHosts}, + OriginIDLow: bson.M{"$in": qryFltr.OriginIDs, "$nin": qryFltr.NotOriginIDs}, + CostSourceLow: bson.M{"$in": qryFltr.CostSources, "$nin": qryFltr.NotCostSources}, + UsageLow: bson.M{"$gte": qryFltr.Usage[0], "$lt": qryFltr.Usage[1]}, + CreatedAtLow: bson.M{"$gte": qryFltr.CreatedAtStart, "$lt": qryFltr.CreatedAtEnd}, + } + ms.cleanEmptyFilters(filters) + return ms.query(func(sctx mongo.SessionContext) (err error) { + _, err = ms.getCol(utils.SessionCostsTBL).DeleteMany(sctx, filters) + return err + }) +} + func (ms *MongoStorage) SetCDR(cdr *CDR, allowUpdate bool) (err error) { if cdr.OrderID == 0 { cdr.OrderID = ms.cnter.Next() diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 97d7f8e3f..d30f5acdd 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -739,6 +739,66 @@ func (self *SQLStorage) RemoveSMCost(smc *SMCost) error { return nil } +func (self *SQLStorage) RemoveSMCosts(qryFltr *utils.SMCostFilter) error { + q := self.db.Table(utils.SessionCostsTBL).Select("*") + // Add filters, use in to replace the high number of ORs + if len(qryFltr.CgrIDs) != 0 { + q = q.Where("cgrid in (?)", qryFltr.CgrIDs) + } + if len(qryFltr.NotCgrIDs) != 0 { + q = q.Where("cgrid not in (?)", qryFltr.NotCgrIDs) + } + if len(qryFltr.RunIDs) != 0 { + q = q.Where("run_id in (?)", qryFltr.RunIDs) + } + if len(qryFltr.NotRunIDs) != 0 { + q = q.Where("run_id not in (?)", qryFltr.NotRunIDs) + } + if len(qryFltr.OriginIDs) != 0 { + q = q.Where("origin_id in (?)", qryFltr.OriginIDs) + } + if len(qryFltr.NotOriginIDs) != 0 { + q = q.Where("origin_id not in (?)", qryFltr.NotOriginIDs) + } + if len(qryFltr.OriginHosts) != 0 { + q = q.Where("origin_host in (?)", qryFltr.OriginHosts) + } + if len(qryFltr.NotOriginHosts) != 0 { + q = q.Where("origin_host not in (?)", qryFltr.NotOriginHosts) + } + if len(qryFltr.CostSources) != 0 { + q = q.Where("costsource in (?)", qryFltr.CostSources) + } + if len(qryFltr.NotCostSources) != 0 { + q = q.Where("costsource not in (?)", qryFltr.NotCostSources) + } + if qryFltr.CreatedAtStart != nil { + q = q.Where("created_at >= ?", qryFltr.CreatedAtStart) + } + if qryFltr.CreatedAtEnd != nil { + q = q.Where("created_at < ?", qryFltr.CreatedAtEnd) + } + if qryFltr.Usage[0] != nil { + if self.db.Dialect().GetName() == utils.MYSQL { // MySQL needs escaping for usage + q = q.Where("`usage` >= ?", qryFltr.Usage[0].Nanoseconds()) + } else { + q = q.Where("usage >= ?", qryFltr.Usage[0].Nanoseconds()) + } + } + if qryFltr.Usage[1] != nil { + if self.db.Dialect().GetName() == utils.MYSQL { // MySQL needs escaping for usage + q = q.Where("`usage` < ?", qryFltr.Usage[1].Nanoseconds()) + } else { + q = q.Where("usage < ?", qryFltr.Usage[1].Nanoseconds()) + } + } + if err := q.Delete(nil).Error; err != nil { + q.Rollback() + return err + } + return nil +} + // GetSMCosts is used to retrieve one or multiple SMCosts based on filter func (self *SQLStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix string) ([]*SMCost, error) { var smCosts []*SMCost diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 84a21701f..fa556ec57 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1202,3 +1202,124 @@ type TPDispatcherProfile struct { Weight float64 Conns []*TPDispatcherConns } + +type SMCostFilter struct { //id cu litere mare + CgrIDs []string + NotCgrIDs []string + RunIDs []string + NotRunIDs []string + OriginHosts []string + NotOriginHosts []string + OriginIDs []string + NotOriginIDs []string + CostSources []string + NotCostSources []string + Usage []*time.Duration // slice min=Usage[0]&max=Usage[1] + CreatedAtStart *time.Time // Start of interval, bigger or equal than configured + CreatedAtEnd *time.Time // End interval, smaller than +} + +func AppendToSMCostFilter(smcFilter *SMCostFilter, fieldType, fieldName string, values []string, timezone string) (smcf *SMCostFilter, err error) { + const ( + MetaString = "*string" + MetaNotString = "*notstring" + MetaLessThan = "*lt" + MetaGreaterOrEqual = "*gte" + ) + switch fieldName { + case DynamicDataPrefix + CGRID: + switch fieldType { + case MetaString: + smcFilter.CgrIDs = append(smcFilter.CgrIDs, values...) + case MetaNotString: + smcFilter.NotCgrIDs = append(smcFilter.NotCgrIDs, values...) + default: + err = fmt.Errorf("FilterType: %q not supported for FieldName: %q", fieldType, fieldName) + } + case DynamicDataPrefix + RunID: + switch fieldType { + case MetaString: + smcFilter.RunIDs = append(smcFilter.RunIDs, values...) + case MetaNotString: + smcFilter.NotRunIDs = append(smcFilter.NotRunIDs, values...) + default: + err = fmt.Errorf("FilterType: %q not supported for FieldName: %q", fieldType, fieldName) + } + case DynamicDataPrefix + OriginHost: + switch fieldType { + case MetaString: + smcFilter.OriginHosts = append(smcFilter.OriginHosts, values...) + case MetaNotString: + smcFilter.NotOriginHosts = append(smcFilter.NotOriginHosts, values...) + default: + err = fmt.Errorf("FilterType: %q not supported for FieldName: %q", fieldType, fieldName) + } + case DynamicDataPrefix + OriginID: + switch fieldType { + case MetaString: + smcFilter.OriginIDs = append(smcFilter.OriginIDs, values...) + case MetaNotString: + smcFilter.NotOriginIDs = append(smcFilter.NotOriginIDs, values...) + default: + err = fmt.Errorf("FilterType: %q not supported for FieldName: %q", fieldType, fieldName) + } + case DynamicDataPrefix + CostSource: + switch fieldType { + case MetaString: + smcFilter.CostSources = append(smcFilter.CostSources, values...) + case MetaNotString: + smcFilter.CostSources = append(smcFilter.NotCostSources, values...) + default: + err = fmt.Errorf("FilterType: %q not supported for FieldName: %q", fieldType, fieldName) + } + case DynamicDataPrefix + Usage: + switch fieldType { + case MetaGreaterOrEqual: + var minUsage time.Duration + minUsage, err = ParseDurationWithNanosecs(values[0]) + if err != nil { + err = fmt.Errorf("Error when converting field: %q value: %q in float ", fieldType, fieldName) + break + } + smcFilter.Usage[0] = &minUsage + case MetaLessThan: + var maxUsage time.Duration + maxUsage, err = ParseDurationWithNanosecs(values[0]) + if err != nil { + err = fmt.Errorf("Error when converting field: %q value: %q in float ", fieldType, fieldName) + break + } + smcFilter.Usage[1] = &maxUsage + default: + err = fmt.Errorf("FilterType: %q not supported for FieldName: %q", fieldType, fieldName) + } + case DynamicDataPrefix + CreatedAt: + switch fieldType { + case MetaGreaterOrEqual: + var start time.Time + start, err = ParseTimeDetectLayout(values[0], timezone) + if err != nil { + err = fmt.Errorf("Error when converting field: %q value: %q in time.Time ", fieldType, fieldName) + break + } + if !start.IsZero() { + smcFilter.CreatedAtStart = &start + } + case MetaLessThan: + var end time.Time + end, err = ParseTimeDetectLayout(values[0], timezone) + if err != nil { + err = fmt.Errorf("Error when converting field: %q value: %q in time.Time ", fieldType, fieldName) + break + } + if !end.IsZero() { + smcFilter.CreatedAtEnd = &end + } + default: + err = fmt.Errorf("FilterType: %q not supported for FieldName: %q", fieldType, fieldName) + } + default: + err = fmt.Errorf("FieldName: %q not supported", fieldName) + } + return smcFilter, err +}