Add StatIDs option for Stat ProcessEvent

This commit is contained in:
TeoV
2018-08-31 06:38:28 -04:00
committed by Dan Christian Bogos
parent 4c0f575302
commit cf24c1919f
13 changed files with 162 additions and 130 deletions

View File

@@ -209,7 +209,8 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) {
}
if self.stats != nil {
var reply []string
go self.stats.Call(utils.StatSv1ProcessEvent, cdr.AsCGREvent(), &reply)
go self.stats.Call(utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{CGREvent: *cdr.AsCGREvent()}, &reply)
}
if len(self.cgrCfg.CDRSOnlineCDRExports) != 0 { // Replicate raw CDR
self.replicateCDRs([]*CDR{cdr})
@@ -296,7 +297,7 @@ func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR, store, cdrstats,
}
if self.stats != nil {
var reply []string
go self.stats.Call(utils.StatSv1ProcessEvent, ratedCDR.AsCGREvent(), &reply)
go self.stats.Call(utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{CGREvent: *ratedCDR.AsCGREvent()}, &reply)
}
}
}
@@ -677,7 +678,7 @@ func (cdrS *CdrServer) thdSProcessEvent(cgrEv *utils.CGREvent) {
// statSProcessEvent will send the event to StatS if the connection is configured
func (cdrS *CdrServer) statSProcessEvent(cgrEv *utils.CGREvent) {
var reply []string
if err := cdrS.stats.Call(utils.StatSv1ProcessEvent, cgrEv, &reply); err != nil &&
if err := cdrS.stats.Call(utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{CGREvent: *cgrEv}, &reply); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s.",

View File

@@ -145,27 +145,33 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) {
}
// matchingStatQueuesForEvent returns ordered list of matching resources which are active by the time of the call
func (sS *StatService) matchingStatQueuesForEvent(ev *utils.CGREvent) (sqs StatQueues, err error) {
func (sS *StatService) matchingStatQueuesForEvent(args *StatsArgsProcessEvent) (sqs StatQueues, err error) {
matchingSQs := make(map[string]*StatQueue)
sqIDs, err := matchingItemIDsForEvent(ev.Event, sS.stringIndexedFields, sS.prefixIndexedFields,
sS.dm, utils.CacheStatFilterIndexes, ev.Tenant, sS.filterS.cfg.FilterSCfg().IndexedSelects)
if err != nil {
return nil, err
var sqIDs []string
if len(args.StatIDs) != 0 {
sqIDs = args.StatIDs
} else {
mapIDs, err := matchingItemIDsForEvent(args.Event, sS.stringIndexedFields, sS.prefixIndexedFields,
sS.dm, utils.CacheStatFilterIndexes, args.Tenant, sS.filterS.cfg.FilterSCfg().IndexedSelects)
if err != nil {
return nil, err
}
sqIDs = mapIDs.Slice()
}
for sqID := range sqIDs {
sqPrfl, err := sS.dm.GetStatQueueProfile(ev.Tenant, sqID, false, utils.NonTransactional)
for _, sqID := range sqIDs {
sqPrfl, err := sS.dm.GetStatQueueProfile(args.Tenant, sqID, false, utils.NonTransactional)
if err != nil {
if err == utils.ErrNotFound {
continue
}
return nil, err
}
if sqPrfl.ActivationInterval != nil && ev.Time != nil &&
!sqPrfl.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active
if sqPrfl.ActivationInterval != nil && args.Time != nil &&
!sqPrfl.ActivationInterval.IsActiveAtTime(*args.Time) { // not active
continue
}
if pass, err := sS.filterS.Pass(ev.Tenant, sqPrfl.FilterIDs,
config.NewNavigableMap(ev.Event)); err != nil {
if pass, err := sS.filterS.Pass(args.Tenant, sqPrfl.FilterIDs,
config.NewNavigableMap(args.Event)); err != nil {
return nil, err
} else if !pass {
continue
@@ -209,10 +215,15 @@ func (ss *StatService) Call(serviceMethod string, args interface{}, reply interf
return utils.RPCCall(ss, serviceMethod, args, reply)
}
type StatsArgsProcessEvent struct {
StatIDs []string
utils.CGREvent
}
// processEvent processes a new event, dispatching to matching queues
// queues matching are also cached to speed up
func (sS *StatService) processEvent(ev *utils.CGREvent) (statQueueIDs []string, err error) {
matchSQs, err := sS.matchingStatQueuesForEvent(ev)
func (sS *StatService) processEvent(args *StatsArgsProcessEvent) (statQueueIDs []string, err error) {
matchSQs, err := sS.matchingStatQueuesForEvent(args)
if err != nil {
return nil, err
}
@@ -225,12 +236,12 @@ func (sS *StatService) processEvent(ev *utils.CGREvent) (statQueueIDs []string,
stsIDs = append(stsIDs, sq.ID)
lkID := utils.StatQueuePrefix + sq.TenantID()
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lkID)
err = sq.ProcessEvent(ev)
err = sq.ProcessEvent(&args.CGREvent)
guardian.Guardian.UnguardIDs(lkID)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<StatS> Queue: %s, ignoring event: %s, error: %s",
sq.TenantID(), ev.TenantID(), err.Error()))
sq.TenantID(), args.TenantID(), err.Error()))
withErrors = true
}
if sS.storeInterval != 0 && sq.dirty != nil { // don't save
@@ -280,13 +291,13 @@ func (sS *StatService) processEvent(ev *utils.CGREvent) (statQueueIDs []string,
}
// V1ProcessEvent implements StatV1 method for processing an Event
func (sS *StatService) V1ProcessEvent(ev *utils.CGREvent, reply *[]string) (err error) {
if missing := utils.MissingStructFields(ev, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
func (sS *StatService) V1ProcessEvent(args *StatsArgsProcessEvent, reply *[]string) (err error) {
if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
} else if ev.Event == nil {
} else if args.Event == nil {
return utils.NewErrMandatoryIeMissing("Event")
}
if ids, err := sS.processEvent(ev); err != nil {
if ids, err := sS.processEvent(args); err != nil {
return err
} else {
*reply = ids
@@ -295,14 +306,14 @@ func (sS *StatService) V1ProcessEvent(ev *utils.CGREvent, reply *[]string) (err
}
// V1StatQueuesForEvent implements StatV1 method for processing an Event
func (sS *StatService) V1GetStatQueuesForEvent(ev *utils.CGREvent, reply *[]string) (err error) {
if missing := utils.MissingStructFields(ev, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
func (sS *StatService) V1GetStatQueuesForEvent(args *StatsArgsProcessEvent, reply *[]string) (err error) {
if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
} else if ev.Event == nil {
} else if args.Event == nil {
return utils.NewErrMandatoryIeMissing("Event")
}
var sQs StatQueues
if sQs, err = sS.matchingStatQueuesForEvent(ev); err != nil {
if sQs, err = sS.matchingStatQueuesForEvent(args); err != nil {
return
} else {
ids := make([]string, len(sQs))

View File

@@ -101,38 +101,44 @@ var (
&StatQueue{Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1]},
&StatQueue{Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2]},
}
statsEvs = []*utils.CGREvent{
&utils.CGREvent{
Tenant: "cgrates.org",
ID: "event1",
Event: map[string]interface{}{
"Stats": "StatQueueProfile1",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC),
"UsageInterval": "1s",
"PddInterval": "1s",
"Weight": "9.0",
utils.Usage: time.Duration(135 * time.Second),
utils.COST: 123.0,
statsEvs = []*StatsArgsProcessEvent{
&StatsArgsProcessEvent{
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
ID: "event1",
Event: map[string]interface{}{
"Stats": "StatQueueProfile1",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC),
"UsageInterval": "1s",
"PddInterval": "1s",
"Weight": "9.0",
utils.Usage: time.Duration(135 * time.Second),
utils.COST: 123.0,
},
},
},
&utils.CGREvent{
Tenant: "cgrates.org",
ID: "event2",
Event: map[string]interface{}{
"Stats": "StatQueueProfile2",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC),
"UsageInterval": "1s",
"PddInterval": "1s",
"Weight": "15.0",
utils.Usage: time.Duration(45 * time.Second),
&StatsArgsProcessEvent{
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
ID: "event2",
Event: map[string]interface{}{
"Stats": "StatQueueProfile2",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC),
"UsageInterval": "1s",
"PddInterval": "1s",
"Weight": "15.0",
utils.Usage: time.Duration(45 * time.Second),
},
},
},
&utils.CGREvent{
Tenant: "cgrates.org",
ID: "event3",
Event: map[string]interface{}{
"Stats": "StatQueueProfilePrefix",
utils.Usage: time.Duration(30 * time.Second),
&StatsArgsProcessEvent{
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
ID: "event3",
Event: map[string]interface{}{
"Stats": "StatQueueProfilePrefix",
utils.Usage: time.Duration(30 * time.Second),
},
},
},
}