From 4dbbfa018271a5fe90d7f180e0fbad3ccec38554 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 12 Dec 2017 17:26:22 +0100 Subject: [PATCH] Attributes processed in responder --- engine/cdr.go | 72 +++++++++++++++++++++++++++++ engine/cdrs.go | 21 ++------- engine/responder.go | 107 +++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 177 insertions(+), 23 deletions(-) diff --git a/engine/cdr.go b/engine/cdr.go index 6c33eef33..b379d7948 100644 --- a/engine/cdr.go +++ b/engine/cdr.go @@ -792,6 +792,78 @@ func (cdr *CDR) AsCDRsql() (cdrSql *CDRsql) { return } +func (cdr *CDR) AsCGREvent() *utils.CGREvent { + cdrIf, _ := cdr.AsMapStringIface() + return &utils.CGREvent{ + Tenant: cdr.Tenant, + ID: utils.UUIDSha1Prefix(), + Event: cdrIf, + } +} + +// UpdateFromCGREvent will update CDR with event fields from CGREvent +func (cdr *CDR) UpdateFromCGREvent(cgrEv *utils.CGREvent, fields []string) (err error) { + for _, fldName := range fields { + switch fldName { + case utils.CDRHOST: + if cdr.OriginHost, err = cgrEv.FieldAsString(fldName); err != nil { + return + } + case utils.CDRSOURCE: + if cdr.Source, err = cgrEv.FieldAsString(fldName); err != nil { + return + } + case utils.TOR: + if cdr.ToR, err = cgrEv.FieldAsString(fldName); err != nil { + return + } + case utils.REQTYPE: + if cdr.RequestType, err = cgrEv.FieldAsString(fldName); err != nil { + return + } + case utils.TENANT: + if cdr.Tenant, err = cgrEv.FieldAsString(fldName); err != nil { + return + } + case utils.CATEGORY: + if cdr.Category, err = cgrEv.FieldAsString(fldName); err != nil { + return + } + case utils.ACCOUNT: + if cdr.Account, err = cgrEv.FieldAsString(fldName); err != nil { + return + } + case utils.SUBJECT: + if cdr.Subject, err = cgrEv.FieldAsString(fldName); err != nil { + return + } + case utils.DESTINATION: + if cdr.Destination, err = cgrEv.FieldAsString(fldName); err != nil { + return + } + case utils.SETUP_TIME: + if cdr.SetupTime, err = cgrEv.FieldAsTime(fldName, config.CgrConfig().DefaultTimezone); err != nil { + return + } + case utils.ANSWER_TIME: + if cdr.AnswerTime, err = cgrEv.FieldAsTime(fldName, config.CgrConfig().DefaultTimezone); err != nil { + return + } + case utils.USAGE: + if cdr.Usage, err = cgrEv.FieldAsDuration(fldName); err != nil { + return + } + default: + fldVal, err := cgrEv.FieldAsString(fldName) + if err != nil { + return err + } + cdr.ExtraFields[fldName] = fldVal + } + } + return +} + // NewCDRFromSQL converts the CDRsql into CDR func NewCDRFromSQL(cdrSql *CDRsql) (cdr *CDR, err error) { cdr = new(CDR) diff --git a/engine/cdrs.go b/engine/cdrs.go index 59ab737c0..9ad1abf66 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -198,13 +198,9 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) { return err // Error is propagated back and we don't continue processing the CDR if we cannot store it } } - cdrIf, _ := cdr.AsMapStringIface() if self.thdS != nil { - cgrEv := &utils.CGREvent{ - Tenant: cdr.Tenant, - ID: utils.UUIDSha1Prefix(), - Event: cdrIf} var hits int + cgrEv := cdr.AsCGREvent() if err := self.thdS.Call(utils.ThresholdSv1ProcessEvent, cgrEv, &hits); err != nil { utils.Logger.Warning( fmt.Sprintf(" error: %s processing CDR event %+v with thdS.", err.Error(), cgrEv)) @@ -216,13 +212,8 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) { go self.cdrstats.Call("CDRStatsV1.AppendCDR", cdr, &out) } if self.stats != nil { - cgrEv := &utils.CGREvent{ - Tenant: cdr.Tenant, - ID: utils.UUIDSha1Prefix(), - Event: cdrIf, - } var reply string - go self.stats.Call(utils.StatSv1ProcessEvent, cgrEv, &reply) + go self.stats.Call(utils.StatSv1ProcessEvent, cdr.AsCGREvent(), &reply) } if len(self.cgrCfg.CDRSOnlineCDRExports) != 0 { // Replicate raw CDR self.replicateCDRs([]*CDR{cdr}) @@ -298,14 +289,8 @@ func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR, store, cdrstats, } } if self.stats != nil { - cdrIf, _ := ratedCDR.AsMapStringIface() - cgrEv := &utils.CGREvent{ - Tenant: cdr.Tenant, - ID: utils.UUIDSha1Prefix(), - Event: cdrIf, - } var reply string - go self.stats.Call(utils.StatSv1ProcessEvent, cgrEv, &reply) + go self.stats.Call(utils.StatSv1ProcessEvent, ratedCDR.AsCGREvent(), &reply) } } } diff --git a/engine/responder.go b/engine/responder.go index a3c041a20..675c60495 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -52,7 +52,6 @@ type Responder struct { Timeout time.Duration Timezone string MaxComputedUsage map[string]time.Duration - cnt int64 responseCache *cache.ResponseCache } @@ -84,10 +83,20 @@ func (rs *Responder) usageAllowed(tor string, reqUsage time.Duration) (allowed b RPC method thet provides the external RPC interface for getting the rating information. */ func (rs *Responder) GetCost(arg *CallDescriptor, reply *CallCost) (err error) { - rs.cnt += 1 if arg.Subject == "" { arg.Subject = arg.Account } + if rs.AttributeS != nil { + var rplyEv AttrSProcessEventReply + if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent, + arg.AsCGREvent(), &rplyEv); err != nil { + return + } + if err = arg.UpdateFromCGREvent(rplyEv.CGREvent, + rplyEv.AlteredFields); err != nil { + return + } + } // replace user profile fields if err := LoadUserProfile(arg, utils.EXTRA_FIELDS); err != nil { return err @@ -124,6 +133,17 @@ func (rs *Responder) Debit(arg *CallDescriptor, reply *CallCost) (err error) { if arg.Subject == "" { arg.Subject = arg.Account } + if rs.AttributeS != nil { + var rplyEv AttrSProcessEventReply + if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent, + arg.AsCGREvent(), &rplyEv); err != nil { + return + } + if err = arg.UpdateFromCGREvent(rplyEv.CGREvent, + rplyEv.AlteredFields); err != nil { + return + } + } // replace user profile fields if err := LoadUserProfile(arg, utils.EXTRA_FIELDS); err != nil { return err @@ -164,6 +184,17 @@ func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error) if arg.Subject == "" { arg.Subject = arg.Account } + if rs.AttributeS != nil { + var rplyEv AttrSProcessEventReply + if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent, + arg.AsCGREvent(), &rplyEv); err != nil { + return + } + if err = arg.UpdateFromCGREvent(rplyEv.CGREvent, + rplyEv.AlteredFields); err != nil { + return + } + } // replace user profile fields if err := LoadUserProfile(arg, utils.EXTRA_FIELDS); err != nil { return err @@ -211,6 +242,17 @@ func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err if arg.Subject == "" { arg.Subject = arg.Account } + if rs.AttributeS != nil { + var rplyEv AttrSProcessEventReply + if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent, + arg.AsCGREvent(), &rplyEv); err != nil { + return + } + if err = arg.UpdateFromCGREvent(rplyEv.CGREvent, + rplyEv.AlteredFields); err != nil { + return + } + } // replace user profile fields if err := LoadUserProfile(arg, utils.EXTRA_FIELDS); err != nil { return err @@ -253,6 +295,17 @@ func (rs *Responder) RefundRounding(arg *CallDescriptor, reply *float64) (err er if arg.Subject == "" { arg.Subject = arg.Account } + if rs.AttributeS != nil { + var rplyEv AttrSProcessEventReply + if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent, + arg.AsCGREvent(), &rplyEv); err != nil { + return + } + if err = arg.UpdateFromCGREvent(rplyEv.CGREvent, + rplyEv.AlteredFields); err != nil { + return + } + } // replace user profile fields if err := LoadUserProfile(arg, utils.EXTRA_FIELDS); err != nil { return err @@ -288,6 +341,17 @@ func (rs *Responder) GetMaxSessionTime(arg *CallDescriptor, reply *float64) (err if arg.Subject == "" { arg.Subject = arg.Account } + if rs.AttributeS != nil { + var rplyEv AttrSProcessEventReply + if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent, + arg.AsCGREvent(), &rplyEv); err != nil { + return + } + if err = arg.UpdateFromCGREvent(rplyEv.CGREvent, + rplyEv.AlteredFields); err != nil { + return + } + } // replace user profile fields if err := LoadUserProfile(arg, utils.EXTRA_FIELDS); err != nil { return err @@ -314,7 +378,7 @@ func (rs *Responder) GetMaxSessionTime(arg *CallDescriptor, reply *float64) (err } // Returns MaxSessionTime for an event received in SessionManager, considering DerivedCharging for it -func (rs *Responder) GetDerivedMaxSessionTime(ev *CDR, reply *float64) error { +func (rs *Responder) GetDerivedMaxSessionTime(ev *CDR, reply *float64) (err error) { cacheKey := utils.GET_DERIV_MAX_SESS_TIME + ev.CGRID + ev.RunID if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil { if item.Value != nil { @@ -325,6 +389,17 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *CDR, reply *float64) error { if ev.Subject == "" { ev.Subject = ev.Account } + if rs.AttributeS != nil { + var rplyEv AttrSProcessEventReply + if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent, + ev.AsCGREvent(), &rplyEv); err != nil { + return + } + if err = ev.UpdateFromCGREvent(rplyEv.CGREvent, + rplyEv.AlteredFields); err != nil { + return + } + } // replace user profile fields if err := LoadUserProfile(ev, utils.EXTRA_FIELDS); err != nil { rs.getCache().Cache(cacheKey, &cache.CacheItem{Err: err}) @@ -422,7 +497,7 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *CDR, reply *float64) error { } // Used by SM to get all the prepaid CallDescriptors attached to a session -func (rs *Responder) GetSessionRuns(ev *CDR, sRuns *[]*SessionRun) error { +func (rs *Responder) GetSessionRuns(ev *CDR, sRuns *[]*SessionRun) (err error) { cacheKey := utils.GET_SESS_RUNS_CACHE_PREFIX + ev.CGRID if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil { if item.Value != nil { @@ -433,6 +508,17 @@ func (rs *Responder) GetSessionRuns(ev *CDR, sRuns *[]*SessionRun) error { if ev.Subject == "" { ev.Subject = ev.Account } + if rs.AttributeS != nil { + var rplyEv AttrSProcessEventReply + if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent, + ev.AsCGREvent(), &rplyEv); err != nil { + return + } + if err = ev.UpdateFromCGREvent(rplyEv.CGREvent, + rplyEv.AlteredFields); err != nil { + return + } + } //utils.Logger.Info(fmt.Sprintf("DC before: %+v", ev)) // replace user profile fields if err := LoadUserProfile(ev, utils.EXTRA_FIELDS); err != nil { @@ -522,7 +608,7 @@ func (rs *Responder) GetDerivedChargers(attrs *utils.AttrDerivedChargers, dcs *u return nil } -func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error { +func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) (err error) { cacheKey := utils.LCRCachePrefix + attrs.CgrID + attrs.RunID if item, err := rs.getCache().Get(cacheKey); err == nil && item != nil { if item.Value != nil { @@ -533,6 +619,17 @@ func (rs *Responder) GetLCR(attrs *AttrGetLcr, reply *LCRCost) error { if attrs.CallDescriptor.Subject == "" { attrs.CallDescriptor.Subject = attrs.CallDescriptor.Account } + if rs.AttributeS != nil { + var rplyEv AttrSProcessEventReply + if err = rs.AttributeS.Call(utils.AttributeSv1ProcessEvent, + attrs.CallDescriptor.AsCGREvent(), &rplyEv); err != nil { + return + } + if err = attrs.CallDescriptor.UpdateFromCGREvent(rplyEv.CGREvent, + rplyEv.AlteredFields); err != nil { + return + } + } // replace user profile fields if err := LoadUserProfile(attrs.CallDescriptor, utils.EXTRA_FIELDS); err != nil { return err