SessionS - processEvent function with sync actions

This commit is contained in:
DanB
2019-11-22 19:51:20 +01:00
parent 18999bcbbd
commit 360a9cc00b
7 changed files with 216 additions and 71 deletions

View File

@@ -275,6 +275,7 @@ func (cdrS *CDRServer) attrStoExpThdStat(cgrEv *utils.CGREventWithArgDispatcher,
return
}
// rateCDRWithErr rates a CDR including errors
func (cdrS *CDRServer) rateCDRWithErr(cdr *CDRWithArgDispatcher) (ratedCDRs []*CDR) {
var err error
ratedCDRs, err = cdrS.rateCDR(cdr)
@@ -286,20 +287,21 @@ func (cdrS *CDRServer) rateCDRWithErr(cdr *CDRWithArgDispatcher) (ratedCDRs []*C
return
}
// refundCDR will refund the EventCost within the CDR
func (cdrS *CDRServer) refundCDR(cdr *CDR) (err error) {
if !AccountableRequestTypes.HasField(cdr.RequestType) || cdr.CostDetails == nil {
// refundEventCost will refund the EventCost using RefundIncrements
func (cdrS *CDRServer) refundEventCost(ec *EventCost, reqType, tor string) (err error) {
if ec == nil || !utils.AccountableRequestTypes.Has(reqType) {
return // non refundable
}
cd := cdr.CostDetails.AsRefundIncrements(cdr.ToR)
cd := ec.AsRefundIncrements(tor)
if cd == nil || len(cd.Increments) == 0 {
return
}
var acnt engine.Account
var acnt Account
if err = cdrS.rals.Call(utils.ResponderRefundIncrements,
&engine.CallDescriptorWithArgDispatcher{CallDescriptor: cd}, &acnt); err != nil {
&CallDescriptorWithArgDispatcher{CallDescriptor: cd}, &acnt); err != nil {
return
}
return
}
// chrgProcessEvent will process the CGREvent with ChargerS subsystem
@@ -325,7 +327,8 @@ func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREventWithArgDispatcher,
partExec = true
continue
}
for _, rtCDR := range cdrS.rateCDRWithErr(&CDRWithArgDispatcher{CDR: cdr, ArgDispatcher: cgrEv.ArgDispatcher}) {
for _, rtCDR := range cdrS.rateCDRWithErr(
&CDRWithArgDispatcher{CDR: cdr, ArgDispatcher: cgrEv.ArgDispatcher}) {
arg := &utils.CGREventWithArgDispatcher{
CGREvent: rtCDR.AsCGREvent(),
ArgDispatcher: cgrEv.ArgDispatcher,
@@ -346,6 +349,29 @@ func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREventWithArgDispatcher,
return
}
// chrgrSProcessEvent forks CGREventWithArgDispatcher into multiples based on matching ChargerS profiles
func (cdrS *CDRServer) chrgrSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (cgrEvs []*utils.CGREventWithArgDispatcher, err error) {
var chrgrs []*ChrgSProcessEventReply
if err = cdrS.chargerS.Call(utils.ChargerSv1ProcessEvent,
cgrEv, &chrgrs); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing CGR event %+v with %s.",
utils.CDRs, err.Error(), cgrEv, utils.ChargerS))
return
}
if len(chrgrs) == 0 {
return
}
cgrEvs = make([]*utils.CGREventWithArgDispatcher, len(chrgrs))
for i, cgrPrfl := range chrgrs {
cgrEvs[i] = &utils.CGREventWithArgDispatcher{
cgrPrfl.CGREvent,
cgrEv.ArgDispatcher,
}
}
return
}
// statSProcessEvent will send the event to StatS if the connection is configured
func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (err error) {
var rplyEv AttrSProcessEventReply
@@ -369,40 +395,36 @@ func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher)
return
}
// thdSProcessEvent will send the event to ThresholdS if the connection is configured
func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) {
// thdSProcessEvent will send the event to ThresholdS
func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (err error) {
var tIDs []string
// we clone the CGREvent so we can add EventType without it to be propagated
// we clone the CGREvent so we can add EventType without being propagated
thArgs := &ArgsProcessEvent{CGREvent: cgrEv.CGREvent.Clone()}
thArgs.CGREvent.Event[utils.EventType] = utils.CDR
if cgrEv.ArgDispatcher != nil {
thArgs.ArgDispatcher = cgrEv.ArgDispatcher
}
if err := cdrS.thdS.Call(utils.ThresholdSv1ProcessEvent,
if err = cdrS.thdS.Call(utils.ThresholdSv1ProcessEvent,
thArgs, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing CDR event %+v with thdS.",
utils.CDRs, err.Error(), cgrEv))
return
err.Error() == utils.ErrNotFound.Error() {
err = nil // NotFound is not considered error
}
return
}
// statSProcessEvent will send the event to StatS if the connection is configured
func (cdrS *CDRServer) statSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) {
// statSProcessEvent will send the event to StatS
func (cdrS *CDRServer) statSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (err error) {
var reply []string
statArgs := &StatsArgsProcessEvent{CGREvent: cgrEv.CGREvent}
if cgrEv.ArgDispatcher != nil {
statArgs.ArgDispatcher = cgrEv.ArgDispatcher
}
if err := cdrS.statS.Call(utils.StatSv1ProcessEvent,
if err = cdrS.statS.Call(utils.StatSv1ProcessEvent,
statArgs, &reply); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s.",
utils.CDRs, err.Error(), cgrEv, utils.StatS))
return
err.Error() == utils.ErrNotFound.Error() {
err = nil // NotFound is not considered error
}
return
}
// exportCDRs will export the CDRs received
@@ -426,6 +448,122 @@ func (cdrS *CDRServer) exportCDRs(cdrs []*CDR) (err error) {
return
}
// processEvent processes a CGREvent based on arguments
func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithArgDispatcher,
chrgS, attrS, ralS, store, reRate, export, thdS, stS bool) (err error) {
var cgrEvs []*utils.CGREventWithArgDispatcher
if chrgS {
if cgrEvs, err = cdrS.chrgrSProcessEvent(ev); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, err.Error(), ev, utils.ChargerS))
err = utils.ErrPartiallyExecuted
return
}
} else { // ChargerS not requested, charge the original event
cgrEvs = []*utils.CGREventWithArgDispatcher{ev}
}
if attrS {
for _, cgrEv := range cgrEvs {
if err = cdrS.attrSProcessEvent(cgrEv); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, err.Error(), cgrEv, utils.AttributeS))
err = utils.ErrPartiallyExecuted
return
}
}
}
cdrs := make([]*CDR, len(cgrEvs))
if ralS || store || reRate || export {
for i, cgrEv := range cgrEvs {
if cdrs[i], err = NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg,
cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> converting event %+v to CDR",
utils.CDRs, err.Error(), cgrEv))
err = utils.ErrPartiallyExecuted
return
}
}
}
if ralS {
for i, cdr := range cdrs {
for j, rtCDR := range cdrS.rateCDRWithErr(
&CDRWithArgDispatcher{CDR: cdr,
ArgDispatcher: ev.ArgDispatcher}) {
cgrEv := &utils.CGREventWithArgDispatcher{
CGREvent: rtCDR.AsCGREvent(),
ArgDispatcher: ev.ArgDispatcher,
}
if j == 0 { // the first CDR will replace the events we got already as a small optimization
cdrs[i] = rtCDR
cgrEvs[i] = cgrEv
} else {
cdrs = append(cdrs, cdr)
cgrEvs = append(cgrEvs, cgrEv)
}
}
}
}
if store {
for _, cdr := range cdrs {
if err = cdrS.cdrDb.SetCDR(cdr, false); err != nil {
if err.Error() == "duplicate" && reRate { // fix error name here
if err = cdrS.refundEventCost(cdr.CostDetails,
cdr.RequestType, cdr.ToR); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> refunding CDR %+v",
utils.CDRs, err.Error(), cdr))
err = utils.ErrPartiallyExecuted
return
}
// after refund we can force update
if err = cdrS.cdrDb.SetCDR(cdr, true); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> updating CDR %+v",
utils.CDRs, err.Error(), cdr))
err = utils.ErrPartiallyExecuted
return
}
}
return
}
}
}
var partiallyExecuted bool // from here actions are optional and a general error is returned
if export {
if err = cdrS.exportCDRs(cdrs); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> exporting CDRs %+v",
utils.CDRs, err.Error(), cdrs))
}
}
if thdS {
for _, cgrEv := range cgrEvs {
if err = cdrS.thdSProcessEvent(cgrEv); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, err.Error(), cgrEv, utils.ThresholdS))
}
}
}
if stS {
for _, cgrEv := range cgrEvs {
if err = cdrS.statSProcessEvent(cgrEv); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, err.Error(), cgrEv, utils.StatS))
}
}
}
if partiallyExecuted {
err = utils.ErrPartiallyExecuted
}
return
}
// Call implements the rpcclient.RpcClientConnection interface
func (cdrS *CDRServer) Call(serviceMethod string, args interface{}, reply interface{}) error {
parts := strings.Split(serviceMethod, ".")
@@ -539,14 +677,8 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDRWithArgDispatcher, reply *string) (e
}
type ArgV1ProcessEvent struct {
Flags []string
utils.CGREvent
AttributeS *bool // control AttributeS processing
RALs *bool // control if we rate the event
ChargerS *bool // control ChargerS processing
Store *bool // control storing of the CDR
Export *bool // control online exports for the CDR
ThresholdS *bool // control ThresholdS
StatS *bool // control sending the CDR to StatS for aggregation
*utils.ArgDispatcher
}
@@ -575,34 +707,41 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er
}
// end of RPC caching
// processing options
var flgs utils.FlagsWithParams
if flgs, err = utils.FlagsWithParamsFromSlice(arg.Flags); err != nil {
return
}
attrS := cdrS.attrS != nil
if arg.AttributeS != nil {
attrS = *arg.AttributeS
if flgs.HasKey(utils.MetaAttributes) {
attrS = flgs.GetBool(utils.MetaAttributes)
}
store := cdrS.cgrCfg.CdrsCfg().StoreCdrs
if arg.Store != nil {
store = *arg.Store
if flgs.HasKey(utils.MetaStore) {
store = flgs.GetBool(utils.MetaStore)
}
export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0
if arg.Export != nil {
export = *arg.Export
if flgs.HasKey(utils.MetaExport) {
export = flgs.GetBool(utils.MetaExport)
}
thdS := cdrS.thdS != nil
if arg.ThresholdS != nil {
thdS = *arg.ThresholdS
if flgs.HasKey(utils.MetaThresholds) {
thdS = flgs.GetBool(utils.MetaThresholds)
}
statS := cdrS.statS != nil
if arg.StatS != nil {
statS = *arg.StatS
if flgs.HasKey(utils.MetaStats) {
statS = flgs.GetBool(utils.MetaStats)
}
chrgS := cdrS.chargerS != nil
if arg.ChargerS != nil {
chrgS = *arg.ChargerS
chrgS := cdrS.chargerS != nil // activate charging for the Event
if flgs.HasKey(utils.MetaChargers) {
chrgS = flgs.GetBool(utils.MetaChargers)
}
var ralS bool // by default we don't extra charge the received CDR
if arg.RALs != nil {
ralS = *arg.RALs
var ralS bool // activate single rating for the CDR
if flgs.HasKey(utils.MetaRALs) {
ralS = flgs.GetBool(utils.MetaRALs)
}
// end of processing options
cgrEv := &utils.CGREventWithArgDispatcher{
CGREvent: &arg.CGREvent,
}

View File

@@ -288,7 +288,7 @@ func (ec *EventCost) AsRefundIncrements(tor string) (cd *CallDescriptor) {
blncSmry := ec.AccountSummary.BalanceSummaries.BalanceSummaryWithUUD(ec.Accounting[cIcrm.AccountingID].BalanceUUID)
if blncSmry.Type == utils.MONETARY {
cd.Increments[iIdx].BalanceInfo.Monetary = &MonetaryInfo{UUID: blncSmry.UUID}
} else if NonMonetaryBalances.HasField(blncSmry.Type) {
} else if utils.NonMonetaryBalances.Has(blncSmry.Type) {
cd.Increments[iIdx].BalanceInfo.Unit = &UnitInfo{UUID: blncSmry.UUID}
}
if ec.Accounting[cIcrm.AccountingID].ExtraChargeID == utils.META_NONE ||
@@ -301,7 +301,7 @@ func (ec *EventCost) AsRefundIncrements(tor string) (cd *CallDescriptor) {
ec.Accounting[ec.Accounting[cIcrm.AccountingID].ExtraChargeID].BalanceUUID)
if extraSmry.Type == utils.MONETARY {
cd.Increments[iIdx].BalanceInfo.Monetary = &MonetaryInfo{UUID: extraSmry.UUID}
} else if NonMonetaryBalances.HasField(blncSmry.Type) {
} else if utils.NonMonetaryBalances.Has(blncSmry.Type) {
cd.Increments[iIdx].BalanceInfo.Unit = &UnitInfo{UUID: extraSmry.UUID}
}
}

View File

@@ -27,20 +27,6 @@ import (
"github.com/cgrates/cgrates/utils"
)
// NonMonetaryBalances are types of balances which are not handled as monetary
var NonMonetaryBalances = MapEvent{
utils.VOICE: struct{}{},
utils.SMS: struct{}{},
utils.DATA: struct{}{},
utils.GENERIC: struct{}{},
}
var AccountableRequestTypes = MapEvent{
utils.MetaPrepaid: struct{}{},
utils.MetaPostpaid: struct{}{},
utils.MetaPseudoprepaid: struct{}{},
}
// ChargingInterval represents one interval out of Usage providing charging info
// eg: PEAK vs OFFPEAK
type ChargingInterval struct {

View File

@@ -397,14 +397,14 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs
var reply string
for _, cgrEv := range cgrEvs {
argsProc := &engine.ArgV1ProcessEvent{
Flags: []string{fmt.Sprintf("%s:false", utils.MetaChargers),
fmt.Sprintf("%s:false", utils.MetaAttributes)},
CGREvent: *cgrEv,
ChargerS: utils.BoolPointer(false),
AttributeS: utils.BoolPointer(false),
ArgDispatcher: s.ArgDispatcher,
}
if unratedReqs.HasField( // order additional rating for unrated request types
engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RequestType)) {
argsProc.RALs = utils.BoolPointer(true)
argsProc.Flags = append(argsProc.Flags, fmt.Sprintf("%s:true", utils.MetaRALs))
}
if err = sS.cdrS.Call(utils.CDRsV1ProcessEvent, argsProc, &reply); err != nil {
utils.Logger.Warning(
@@ -2637,14 +2637,14 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection,
var withErrors bool
for _, cgrEv := range cgrEvs {
argsProc := &engine.ArgV1ProcessEvent{
Flags: []string{fmt.Sprintf("%s:false", utils.MetaChargers),
fmt.Sprintf("%s:false", utils.MetaAttributes)},
CGREvent: *cgrEv,
ChargerS: utils.BoolPointer(false),
AttributeS: utils.BoolPointer(false),
ArgDispatcher: cgrEvWithArgDisp.ArgDispatcher,
}
if mp := engine.MapEvent(cgrEv.Event); mp.GetStringIgnoreErrors(utils.RunID) != utils.MetaRaw && // check if is *raw
unratedReqs.HasField(mp.GetStringIgnoreErrors(utils.RequestType)) { // order additional rating for unrated request types
argsProc.RALs = utils.BoolPointer(true)
argsProc.Flags = append(argsProc.Flags, fmt.Sprintf("%s:true", utils.MetaRALs))
}
if err = sS.cdrS.Call(utils.CDRsV1ProcessEvent,
argsProc, rply); err != nil {
@@ -2990,8 +2990,8 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection,
dbtItvl := sS.cgrCfg.SessionSCfg().DebitInterval
//convert from Flags []string to utils.FlagsWithParams
argsFlagsWithParams, err := utils.FlagsWithParamsFromSlice(args.Flags)
if err != nil {
var argsFlagsWithParams utils.FlagsWithParams
if argsFlagsWithParams, err = utils.FlagsWithParamsFromSlice(args.Flags); err != nil {
return
}
// check for *attribute

View File

@@ -102,6 +102,12 @@ var (
DispatcherProfilePrefix: CacheDispatcherFilterIndexes,
}
CacheIndexesToPrefix map[string]string // will be built on init
// NonMonetaryBalances are types of balances which are not handled as monetary
NonMonetaryBalances = NewStringSet([]string{VOICE, SMS, DATA, GENERIC})
// AccountableRequestTypes are the ones handled by Accounting subsystem
AccountableRequestTypes = NewStringSet([]string{META_PREPAID, META_POSTPAID, META_PSEUDOPREPAID})
)
const (
@@ -508,6 +514,7 @@ const (
MetaRemove = "*remove"
MetaStore = "*store"
MetaClear = "*clear"
MetaExport = "*export"
LoadIDs = "load_ids"
DNSAgent = "DNSAgent"
TLSNoCaps = "tls"

View File

@@ -288,3 +288,15 @@ func (fWp FlagsWithParams) SliceFlags() (sls []string) {
}
return
}
// GetBool returns the flag as boolean
func (fWp FlagsWithParams) GetBool(key string) (b bool) {
var v interface{}
if _, b = fWp[key]; !b {
return // not present means false
}
if v.(string) != "" && v.(string) != "true" {
return // not empty nor true means false again
}
return true
}

View File

@@ -24,6 +24,7 @@ func NewStringSet(dataSlice []string) (s *StringSet) {
return s
}
// StringSet will manage data within a set
type StringSet struct {
data map[string]struct{}
}