Added *remove_session_costs action. fixes#1423

This commit is contained in:
Trial97
2019-03-21 12:36:36 +02:00
committed by Dan Christian Bogos
parent 47e612f5ff
commit fa591c50a7
7 changed files with 230 additions and 0 deletions

View File

@@ -84,6 +84,7 @@ const (
SetExpiry = "*set_expiry"
MetaPublishAccount = "*publish_account"
MetaPublishBalance = "*publish_balance"
MetaRemoveSessionCosts = "*remove_session_costs"
)
func (a *Action) Clone() *Action {
@@ -127,6 +128,7 @@ func getActionFunc(typ string) (actionTypeFunc, bool) {
utils.MetaAMQPjsonMap: sendAMQP,
utils.MetaAWSjsonMap: sendAWS,
utils.MetaSQSjsonMap: sendSQS,
MetaRemoveSessionCosts: cleanSessionCosts,
}
f, exists := actionFuncMap[typ]
return f, exists
@@ -960,3 +962,28 @@ func (cdrP *cdrLogProvider) AsNavigableMap([]*config.FCTemplate) (
func (cdrP *cdrLogProvider) RemoteHost() net.Addr {
return utils.LocalAddr()
}
func cleanSessionCosts(_ *Account, _ *Action, _ Actions, extraData interface{}) error { // FiltersID;inlineFilter
fltrs, err := utils.IfaceAsString(extraData)
if err != nil {
return err
}
tenant := config.CgrConfig().GeneralCfg().DefaultTenant
smcFilter := new(utils.SMCostFilter)
smcFilter.Usage = make([]*time.Duration, 2)
for _, fltrID := range strings.Split(fltrs, utils.INFIELD_SEP) {
fltr, err := dm.GetFilter(tenant, fltrID, true, true, utils.NonTransactional)
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Error: %s for filter: %s in action: <%s>",
utils.Actions, err.Error(), fltrID, MetaRemoveSessionCosts))
continue
}
for _, rule := range fltr.Rules {
smcFilter, err = utils.AppendToSMCostFilter(smcFilter, rule.Type, rule.FieldName, rule.Values, config.CgrConfig().GeneralCfg().DefaultTimezone)
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> %s in action: <%s>", utils.Actions, err.Error(), MetaRemoveSessionCosts))
}
}
}
return cdrStorage.RemoveSMCosts(smcFilter)
}

View File

@@ -142,6 +142,7 @@ type CdrStorage interface {
SetSMCost(smc *SMCost) error
GetSMCosts(cgrid, runid, originHost, originIDPrfx string) ([]*SMCost, error)
RemoveSMCost(*SMCost) error
RemoveSMCosts(qryFltr *utils.SMCostFilter) error
GetCDRs(*utils.CDRsFilter, bool) ([]*CDR, int64, error)
}

View File

@@ -158,6 +158,9 @@ func (ms *MapStorage) SetCDR(cdr *CDR, allowUpdate bool) (err error) {
func (ms *MapStorage) RemoveSMCost(smc *SMCost) (err error) {
return utils.ErrNotImplemented
}
func (ms *MapStorage) RemoveSMCosts(qryFltr *utils.SMCostFilter) error {
return utils.ErrNotImplemented
}
func (ms *MapStorage) GetCDRs(filter *utils.CDRsFilter, remove bool) (cdrs []*CDR, count int64, err error) {
return nil, 0, utils.ErrNotImplemented
}

View File

@@ -99,6 +99,7 @@ var (
CostDetailsLow = strings.ToLower(utils.CostDetails)
DestinationLow = strings.ToLower(utils.Destination)
CostLow = strings.ToLower(utils.COST)
CostSourceLow = strings.ToLower(utils.CostSource)
tTime = reflect.TypeOf(time.Time{})
)

View File

@@ -912,6 +912,23 @@ func (ms *MongoStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix stri
return smcs, err
}
func (ms *MongoStorage) RemoveSMCosts(qryFltr *utils.SMCostFilter) error {
filters := bson.M{
CGRIDLow: bson.M{"$in": qryFltr.CgrIDs, "$nin": qryFltr.NotCgrIDs},
RunIDLow: bson.M{"$in": qryFltr.RunIDs, "$nin": qryFltr.NotRunIDs},
OriginHostLow: bson.M{"$in": qryFltr.OriginHosts, "$nin": qryFltr.NotOriginHosts},
OriginIDLow: bson.M{"$in": qryFltr.OriginIDs, "$nin": qryFltr.NotOriginIDs},
CostSourceLow: bson.M{"$in": qryFltr.CostSources, "$nin": qryFltr.NotCostSources},
UsageLow: bson.M{"$gte": qryFltr.Usage[0], "$lt": qryFltr.Usage[1]},
CreatedAtLow: bson.M{"$gte": qryFltr.CreatedAtStart, "$lt": qryFltr.CreatedAtEnd},
}
ms.cleanEmptyFilters(filters)
return ms.query(func(sctx mongo.SessionContext) (err error) {
_, err = ms.getCol(utils.SessionCostsTBL).DeleteMany(sctx, filters)
return err
})
}
func (ms *MongoStorage) SetCDR(cdr *CDR, allowUpdate bool) (err error) {
if cdr.OrderID == 0 {
cdr.OrderID = ms.cnter.Next()

View File

@@ -739,6 +739,66 @@ func (self *SQLStorage) RemoveSMCost(smc *SMCost) error {
return nil
}
func (self *SQLStorage) RemoveSMCosts(qryFltr *utils.SMCostFilter) error {
q := self.db.Table(utils.SessionCostsTBL).Select("*")
// Add filters, use in to replace the high number of ORs
if len(qryFltr.CgrIDs) != 0 {
q = q.Where("cgrid in (?)", qryFltr.CgrIDs)
}
if len(qryFltr.NotCgrIDs) != 0 {
q = q.Where("cgrid not in (?)", qryFltr.NotCgrIDs)
}
if len(qryFltr.RunIDs) != 0 {
q = q.Where("run_id in (?)", qryFltr.RunIDs)
}
if len(qryFltr.NotRunIDs) != 0 {
q = q.Where("run_id not in (?)", qryFltr.NotRunIDs)
}
if len(qryFltr.OriginIDs) != 0 {
q = q.Where("origin_id in (?)", qryFltr.OriginIDs)
}
if len(qryFltr.NotOriginIDs) != 0 {
q = q.Where("origin_id not in (?)", qryFltr.NotOriginIDs)
}
if len(qryFltr.OriginHosts) != 0 {
q = q.Where("origin_host in (?)", qryFltr.OriginHosts)
}
if len(qryFltr.NotOriginHosts) != 0 {
q = q.Where("origin_host not in (?)", qryFltr.NotOriginHosts)
}
if len(qryFltr.CostSources) != 0 {
q = q.Where("costsource in (?)", qryFltr.CostSources)
}
if len(qryFltr.NotCostSources) != 0 {
q = q.Where("costsource not in (?)", qryFltr.NotCostSources)
}
if qryFltr.CreatedAtStart != nil {
q = q.Where("created_at >= ?", qryFltr.CreatedAtStart)
}
if qryFltr.CreatedAtEnd != nil {
q = q.Where("created_at < ?", qryFltr.CreatedAtEnd)
}
if qryFltr.Usage[0] != nil {
if self.db.Dialect().GetName() == utils.MYSQL { // MySQL needs escaping for usage
q = q.Where("`usage` >= ?", qryFltr.Usage[0].Nanoseconds())
} else {
q = q.Where("usage >= ?", qryFltr.Usage[0].Nanoseconds())
}
}
if qryFltr.Usage[1] != nil {
if self.db.Dialect().GetName() == utils.MYSQL { // MySQL needs escaping for usage
q = q.Where("`usage` < ?", qryFltr.Usage[1].Nanoseconds())
} else {
q = q.Where("usage < ?", qryFltr.Usage[1].Nanoseconds())
}
}
if err := q.Delete(nil).Error; err != nil {
q.Rollback()
return err
}
return nil
}
// GetSMCosts is used to retrieve one or multiple SMCosts based on filter
func (self *SQLStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix string) ([]*SMCost, error) {
var smCosts []*SMCost

View File

@@ -1202,3 +1202,124 @@ type TPDispatcherProfile struct {
Weight float64
Conns []*TPDispatcherConns
}
type SMCostFilter struct { //id cu litere mare
CgrIDs []string
NotCgrIDs []string
RunIDs []string
NotRunIDs []string
OriginHosts []string
NotOriginHosts []string
OriginIDs []string
NotOriginIDs []string
CostSources []string
NotCostSources []string
Usage []*time.Duration // slice min=Usage[0]&max=Usage[1]
CreatedAtStart *time.Time // Start of interval, bigger or equal than configured
CreatedAtEnd *time.Time // End interval, smaller than
}
func AppendToSMCostFilter(smcFilter *SMCostFilter, fieldType, fieldName string, values []string, timezone string) (smcf *SMCostFilter, err error) {
const (
MetaString = "*string"
MetaNotString = "*notstring"
MetaLessThan = "*lt"
MetaGreaterOrEqual = "*gte"
)
switch fieldName {
case DynamicDataPrefix + CGRID:
switch fieldType {
case MetaString:
smcFilter.CgrIDs = append(smcFilter.CgrIDs, values...)
case MetaNotString:
smcFilter.NotCgrIDs = append(smcFilter.NotCgrIDs, values...)
default:
err = fmt.Errorf("FilterType: %q not supported for FieldName: %q", fieldType, fieldName)
}
case DynamicDataPrefix + RunID:
switch fieldType {
case MetaString:
smcFilter.RunIDs = append(smcFilter.RunIDs, values...)
case MetaNotString:
smcFilter.NotRunIDs = append(smcFilter.NotRunIDs, values...)
default:
err = fmt.Errorf("FilterType: %q not supported for FieldName: %q", fieldType, fieldName)
}
case DynamicDataPrefix + OriginHost:
switch fieldType {
case MetaString:
smcFilter.OriginHosts = append(smcFilter.OriginHosts, values...)
case MetaNotString:
smcFilter.NotOriginHosts = append(smcFilter.NotOriginHosts, values...)
default:
err = fmt.Errorf("FilterType: %q not supported for FieldName: %q", fieldType, fieldName)
}
case DynamicDataPrefix + OriginID:
switch fieldType {
case MetaString:
smcFilter.OriginIDs = append(smcFilter.OriginIDs, values...)
case MetaNotString:
smcFilter.NotOriginIDs = append(smcFilter.NotOriginIDs, values...)
default:
err = fmt.Errorf("FilterType: %q not supported for FieldName: %q", fieldType, fieldName)
}
case DynamicDataPrefix + CostSource:
switch fieldType {
case MetaString:
smcFilter.CostSources = append(smcFilter.CostSources, values...)
case MetaNotString:
smcFilter.CostSources = append(smcFilter.NotCostSources, values...)
default:
err = fmt.Errorf("FilterType: %q not supported for FieldName: %q", fieldType, fieldName)
}
case DynamicDataPrefix + Usage:
switch fieldType {
case MetaGreaterOrEqual:
var minUsage time.Duration
minUsage, err = ParseDurationWithNanosecs(values[0])
if err != nil {
err = fmt.Errorf("Error when converting field: %q value: %q in float ", fieldType, fieldName)
break
}
smcFilter.Usage[0] = &minUsage
case MetaLessThan:
var maxUsage time.Duration
maxUsage, err = ParseDurationWithNanosecs(values[0])
if err != nil {
err = fmt.Errorf("Error when converting field: %q value: %q in float ", fieldType, fieldName)
break
}
smcFilter.Usage[1] = &maxUsage
default:
err = fmt.Errorf("FilterType: %q not supported for FieldName: %q", fieldType, fieldName)
}
case DynamicDataPrefix + CreatedAt:
switch fieldType {
case MetaGreaterOrEqual:
var start time.Time
start, err = ParseTimeDetectLayout(values[0], timezone)
if err != nil {
err = fmt.Errorf("Error when converting field: %q value: %q in time.Time ", fieldType, fieldName)
break
}
if !start.IsZero() {
smcFilter.CreatedAtStart = &start
}
case MetaLessThan:
var end time.Time
end, err = ParseTimeDetectLayout(values[0], timezone)
if err != nil {
err = fmt.Errorf("Error when converting field: %q value: %q in time.Time ", fieldType, fieldName)
break
}
if !end.IsZero() {
smcFilter.CreatedAtEnd = &end
}
default:
err = fmt.Errorf("FilterType: %q not supported for FieldName: %q", fieldType, fieldName)
}
default:
err = fmt.Errorf("FieldName: %q not supported", fieldName)
}
return smcFilter, err
}