Attributes processed in responder

This commit is contained in:
DanB
2017-12-12 17:26:22 +01:00
parent 7032bea919
commit 4dbbfa0182
3 changed files with 177 additions and 23 deletions

View File

@@ -792,6 +792,78 @@ func (cdr *CDR) AsCDRsql() (cdrSql *CDRsql) {
return
}
func (cdr *CDR) AsCGREvent() *utils.CGREvent {
cdrIf, _ := cdr.AsMapStringIface()
return &utils.CGREvent{
Tenant: cdr.Tenant,
ID: utils.UUIDSha1Prefix(),
Event: cdrIf,
}
}
// UpdateFromCGREvent will update CDR with event fields from CGREvent
func (cdr *CDR) UpdateFromCGREvent(cgrEv *utils.CGREvent, fields []string) (err error) {
for _, fldName := range fields {
switch fldName {
case utils.CDRHOST:
if cdr.OriginHost, err = cgrEv.FieldAsString(fldName); err != nil {
return
}
case utils.CDRSOURCE:
if cdr.Source, err = cgrEv.FieldAsString(fldName); err != nil {
return
}
case utils.TOR:
if cdr.ToR, err = cgrEv.FieldAsString(fldName); err != nil {
return
}
case utils.REQTYPE:
if cdr.RequestType, err = cgrEv.FieldAsString(fldName); err != nil {
return
}
case utils.TENANT:
if cdr.Tenant, err = cgrEv.FieldAsString(fldName); err != nil {
return
}
case utils.CATEGORY:
if cdr.Category, err = cgrEv.FieldAsString(fldName); err != nil {
return
}
case utils.ACCOUNT:
if cdr.Account, err = cgrEv.FieldAsString(fldName); err != nil {
return
}
case utils.SUBJECT:
if cdr.Subject, err = cgrEv.FieldAsString(fldName); err != nil {
return
}
case utils.DESTINATION:
if cdr.Destination, err = cgrEv.FieldAsString(fldName); err != nil {
return
}
case utils.SETUP_TIME:
if cdr.SetupTime, err = cgrEv.FieldAsTime(fldName, config.CgrConfig().DefaultTimezone); err != nil {
return
}
case utils.ANSWER_TIME:
if cdr.AnswerTime, err = cgrEv.FieldAsTime(fldName, config.CgrConfig().DefaultTimezone); err != nil {
return
}
case utils.USAGE:
if cdr.Usage, err = cgrEv.FieldAsDuration(fldName); err != nil {
return
}
default:
fldVal, err := cgrEv.FieldAsString(fldName)
if err != nil {
return err
}
cdr.ExtraFields[fldName] = fldVal
}
}
return
}
// NewCDRFromSQL converts the CDRsql into CDR
func NewCDRFromSQL(cdrSql *CDRsql) (cdr *CDR, err error) {
cdr = new(CDR)

View File

@@ -198,13 +198,9 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) {
return err // Error is propagated back and we don't continue processing the CDR if we cannot store it
}
}
cdrIf, _ := cdr.AsMapStringIface()
if self.thdS != nil {
cgrEv := &utils.CGREvent{
Tenant: cdr.Tenant,
ID: utils.UUIDSha1Prefix(),
Event: cdrIf}
var hits int
cgrEv := cdr.AsCGREvent()
if err := self.thdS.Call(utils.ThresholdSv1ProcessEvent, cgrEv, &hits); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<CDRS> error: %s processing CDR event %+v with thdS.", err.Error(), cgrEv))
@@ -216,13 +212,8 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) {
go self.cdrstats.Call("CDRStatsV1.AppendCDR", cdr, &out)
}
if self.stats != nil {
cgrEv := &utils.CGREvent{
Tenant: cdr.Tenant,
ID: utils.UUIDSha1Prefix(),
Event: cdrIf,
}
var reply string
go self.stats.Call(utils.StatSv1ProcessEvent, cgrEv, &reply)
go self.stats.Call(utils.StatSv1ProcessEvent, cdr.AsCGREvent(), &reply)
}
if len(self.cgrCfg.CDRSOnlineCDRExports) != 0 { // Replicate raw CDR
self.replicateCDRs([]*CDR{cdr})
@@ -298,14 +289,8 @@ func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR, store, cdrstats,
}
}
if self.stats != nil {
cdrIf, _ := ratedCDR.AsMapStringIface()
cgrEv := &utils.CGREvent{
Tenant: cdr.Tenant,
ID: utils.UUIDSha1Prefix(),
Event: cdrIf,
}
var reply string
go self.stats.Call(utils.StatSv1ProcessEvent, cgrEv, &reply)
go self.stats.Call(utils.StatSv1ProcessEvent, ratedCDR.AsCGREvent(), &reply)
}
}
}

View File

@@ -52,7 +52,6 @@ type Responder struct {
Timeout time.Duration
Timezone string
MaxComputedUsage map[string]time.Duration
cnt int64
responseCache *cache.ResponseCache
}
@@ -84,10 +83,20 @@ func (rs *Responder) usageAllowed(tor string, reqUsage time.Duration) (allowed b
RPC method thet provides the external RPC interface for getting the rating information.
*/
func (rs *Responder) GetCost(arg *CallDescriptor, reply *CallCost) (err error) {
rs.cnt += 1
if arg.Subject == "" {
arg.Subject = arg.Account
}
if rs.AttributeS != nil {
var rplyEv AttrSProcessEventReply
if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent,
arg.AsCGREvent(), &rplyEv); err != nil {
return
}
if err = arg.UpdateFromCGREvent(rplyEv.CGREvent,
rplyEv.AlteredFields); err != nil {
return
}
}
// replace user profile fields
if err := LoadUserProfile(arg, utils.EXTRA_FIELDS); err != nil {
return err
@@ -124,6 +133,17 @@ func (rs *Responder) Debit(arg *CallDescriptor, reply *CallCost) (err error) {
if arg.Subject == "" {
arg.Subject = arg.Account
}
if rs.AttributeS != nil {
var rplyEv AttrSProcessEventReply
if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent,
arg.AsCGREvent(), &rplyEv); err != nil {
return
}
if err = arg.UpdateFromCGREvent(rplyEv.CGREvent,
rplyEv.AlteredFields); err != nil {
return
}
}
// replace user profile fields
if err := LoadUserProfile(arg, utils.EXTRA_FIELDS); err != nil {
return err
@@ -164,6 +184,17 @@ func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error)
if arg.Subject == "" {
arg.Subject = arg.Account
}
if rs.AttributeS != nil {
var rplyEv AttrSProcessEventReply
if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent,
arg.AsCGREvent(), &rplyEv); err != nil {
return
}
if err = arg.UpdateFromCGREvent(rplyEv.CGREvent,
rplyEv.AlteredFields); err != nil {
return
}
}
// replace user profile fields
if err := LoadUserProfile(arg, utils.EXTRA_FIELDS); err != nil {
return err
@@ -211,6 +242,17 @@ func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err
if arg.Subject == "" {
arg.Subject = arg.Account
}
if rs.AttributeS != nil {
var rplyEv AttrSProcessEventReply
if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent,
arg.AsCGREvent(), &rplyEv); err != nil {
return
}
if err = arg.UpdateFromCGREvent(rplyEv.CGREvent,
rplyEv.AlteredFields); err != nil {
return
}
}
// replace user profile fields
if err := LoadUserProfile(arg, utils.EXTRA_FIELDS); err != nil {
return err
@@ -253,6 +295,17 @@ func (rs *Responder) RefundRounding(arg *CallDescriptor, reply *float64) (err er
if arg.Subject == "" {
arg.Subject = arg.Account
}
if rs.AttributeS != nil {
var rplyEv AttrSProcessEventReply
if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent,
arg.AsCGREvent(), &rplyEv); err != nil {
return
}
if err = arg.UpdateFromCGREvent(rplyEv.CGREvent,
rplyEv.AlteredFields); err != nil {
return
}
}
// replace user profile fields
if err := LoadUserProfile(arg, utils.EXTRA_FIELDS); err != nil {
return err
@@ -288,6 +341,17 @@ func (rs *Responder) GetMaxSessionTime(arg *CallDescriptor, reply *float64) (err
if arg.Subject == "" {
arg.Subject = arg.Account
}
if rs.AttributeS != nil {
var rplyEv AttrSProcessEventReply
if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent,
arg.AsCGREvent(), &rplyEv); err != nil {
return
}
if err = arg.UpdateFromCGREvent(rplyEv.CGREvent,
rplyEv.AlteredFields); err != nil {
return
}
}
// replace user profile fields
if err := LoadUserProfile(arg, utils.EXTRA_FIELDS); err != nil {
return err
@@ -314,7 +378,7 @@ func (rs *Responder) GetMaxSessionTime(arg *CallDescriptor, reply *float64) (err
}
// Returns MaxSessionTime for an event received in SessionManager, considering DerivedCharging for it
func (rs *Responder) GetDerivedMaxSessionTime(ev *CDR, reply *float64) error {
func (rs *Responder) GetDerivedMaxSessionTime(ev *CDR, reply *float64) (err error) {
cacheKey := utils.GET_DERIV_MAX_SESS_TIME + ev.CGRID + ev.RunID
if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil {
if item.Value != nil {
@@ -325,6 +389,17 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *CDR, reply *float64) error {
if ev.Subject == "" {
ev.Subject = ev.Account
}
if rs.AttributeS != nil {
var rplyEv AttrSProcessEventReply
if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent,
ev.AsCGREvent(), &rplyEv); err != nil {
return
}
if err = ev.UpdateFromCGREvent(rplyEv.CGREvent,
rplyEv.AlteredFields); err != nil {
return
}
}
// replace user profile fields
if err := LoadUserProfile(ev, utils.EXTRA_FIELDS); err != nil {
rs.getCache().Cache(cacheKey, &cache.CacheItem{Err: err})
@@ -422,7 +497,7 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *CDR, reply *float64) error {
}
// Used by SM to get all the prepaid CallDescriptors attached to a session
func (rs *Responder) GetSessionRuns(ev *CDR, sRuns *[]*SessionRun) error {
func (rs *Responder) GetSessionRuns(ev *CDR, sRuns *[]*SessionRun) (err error) {
cacheKey := utils.GET_SESS_RUNS_CACHE_PREFIX + ev.CGRID
if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil {
if item.Value != nil {
@@ -433,6 +508,17 @@ func (rs *Responder) GetSessionRuns(ev *CDR, sRuns *[]*SessionRun) error {
if ev.Subject == "" {
ev.Subject = ev.Account
}
if rs.AttributeS != nil {
var rplyEv AttrSProcessEventReply
if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent,
ev.AsCGREvent(), &rplyEv); err != nil {
return
}
if err = ev.UpdateFromCGREvent(rplyEv.CGREvent,
rplyEv.AlteredFields); err != nil {
return
}
}
//utils.Logger.Info(fmt.Sprintf("DC before: %+v", ev))
// replace user profile fields
if err := LoadUserProfile(ev, utils.EXTRA_FIELDS); err != nil {
@@ -522,7 +608,7 @@ func (rs *Responder) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs *u
return nil
}
func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error {
func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) (err error) {
cacheKey := utils.LCRCachePrefix + attrs.CgrID + attrs.RunID
if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil {
if item.Value != nil {
@@ -533,6 +619,17 @@ func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error {
if attrs.CallDescriptor.Subject == "" {
attrs.CallDescriptor.Subject = attrs.CallDescriptor.Account
}
if rs.AttributeS != nil {
var rplyEv AttrSProcessEventReply
if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent,
attrs.CallDescriptor.AsCGREvent(), &rplyEv); err != nil {
return
}
if err = attrs.CallDescriptor.UpdateFromCGREvent(rplyEv.CGREvent,
rplyEv.AlteredFields); err != nil {
return
}
}
// replace user profile fields
if err := LoadUserProfile(attrs.CallDescriptor, utils.EXTRA_FIELDS); err != nil {
return err