From dbff560dfb5d660d2d1ca6e522ad17b799182b22 Mon Sep 17 00:00:00 2001 From: TeoV Date: Mon, 21 May 2018 05:31:46 -0400 Subject: [PATCH] Stats Process Event return now StatQueueIDs --- agents/kamevent.go | 4 + apier/v1/filterindexecache_it_test.go | 117 ++++++++++++------------ apier/v1/sessions_thresholds_it_test.go | 1 + apier/v1/sessionsv1_it_test.go | 1 + apier/v1/stats.go | 2 +- apier/v1/stats_it_test.go | 26 +++--- console/stats_process_event.go | 2 +- engine/cdrs.go | 4 +- engine/stats.go | 21 +++-- engine/stats_test.go | 22 +++-- sessions/sessions.go | 34 +++++-- utils/consts.go | 3 +- 12 files changed, 141 insertions(+), 96 deletions(-) diff --git a/agents/kamevent.go b/agents/kamevent.go index 855ea1779..c783cb898 100644 --- a/agents/kamevent.go +++ b/agents/kamevent.go @@ -273,6 +273,9 @@ func (kev KamEvent) AsKamAuthReply(authArgs *sessions.V1AuthorizeArgs, if authArgs.ProcessThresholds != nil && *authArgs.ProcessThresholds { kar.Thresholds = strings.Join(*authReply.ThresholdIDs, utils.FIELDS_SEP) } + if authArgs.ProcessStatQueues != nil && *authArgs.ProcessStatQueues { + kar.StatQueues = strings.Join(*authReply.StatQueueIDs, utils.FIELDS_SEP) + } return } @@ -346,6 +349,7 @@ type KamAuthReply struct { MaxUsage int // Maximum session time in case of success, -1 for unlimited Suppliers string // List of suppliers, comma separated Thresholds string + StatQueues string Error string // Reply in case of error } diff --git a/apier/v1/filterindexecache_it_test.go b/apier/v1/filterindexecache_it_test.go index d40173898..c1f081b00 100644 --- a/apier/v1/filterindexecache_it_test.go +++ b/apier/v1/filterindexecache_it_test.go @@ -559,7 +559,7 @@ func testV1FIdxCaRemoveThresholdProfile(t *testing.T) { //StatQueue func testV1FIdxCaGetStatQueuesWithNotFound(t *testing.T) { - var reply *string + var reply *[]string tEv := &utils.CGREvent{ Tenant: "cgrates.org", ID: "event1", @@ -571,8 +571,6 @@ func testV1FIdxCaGetStatQueuesWithNotFound(t *testing.T) { if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) - } else if reply != nil && *reply != "" { - t.Error("Unexpected reply returned", *reply) } if indexes, err = onStor.GetFilterReverseIndexes( utils.PrefixToIndexCache[utils.StatQueueProfilePrefix], "cgrates.org", @@ -603,6 +601,7 @@ func testV1FIdxCaSetStatQueueProfile(t *testing.T) { }, } var result string + if err := tFIdxCaRpc.Call("ApierV1.SetFilter", filter, &result); err != nil { t.Error(err) } else if result != utils.OK { @@ -643,11 +642,13 @@ func testV1FIdxCaSetStatQueueProfile(t *testing.T) { utils.Account: "1001", }, } + var reply []string + expected := []string{"TEST_PROFILE1"} if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, - tEv, &result); err != nil { + tEv, &reply); err != nil { t.Error(err) - } else if result != utils.OK { - t.Error("Unexpected reply returned", result) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) } fldNameVal := map[string]string{"TEST_PROFILE1": ""} @@ -665,7 +666,8 @@ func testV1FIdxCaSetStatQueueProfile(t *testing.T) { } func testV1FIdxCaGetStatQueuesFromTP(t *testing.T) { - var reply string + var reply []string + expected := []string{"Stats1"} ev2 := utils.CGREvent{ Tenant: "cgrates.org", ID: "event2", @@ -675,8 +677,8 @@ func testV1FIdxCaGetStatQueuesFromTP(t *testing.T) { utils.Usage: time.Duration(45 * time.Second)}} if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, &ev2, &reply); err != nil { t.Error(err) - } else if reply != utils.OK { - t.Errorf("received reply: %s", reply) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) } ev3 := &utils.CGREvent{ Tenant: "cgrates.org", @@ -687,8 +689,8 @@ func testV1FIdxCaGetStatQueuesFromTP(t *testing.T) { utils.Usage: 0}} if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, &ev3, &reply); err != nil { t.Error(err) - } else if reply != utils.OK { - t.Errorf("received reply: %s", reply) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) } tEv := &utils.CGREvent{ @@ -701,8 +703,8 @@ func testV1FIdxCaGetStatQueuesFromTP(t *testing.T) { }} if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, &tEv, &reply); err != nil { t.Error(err) - } else if reply != utils.OK { - t.Errorf("received reply: %s", reply) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) } tEv2 := &utils.CGREvent{ Tenant: "cgrates.org", @@ -714,8 +716,8 @@ func testV1FIdxCaGetStatQueuesFromTP(t *testing.T) { }} if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, &tEv2, &reply); err != nil { t.Error(err) - } else if reply != utils.OK { - t.Errorf("received reply: %s", reply) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) } idx := map[string]utils.StringMap{ @@ -786,6 +788,8 @@ func testV1FIdxCaUpdateStatQueueProfile(t *testing.T) { } else if result != utils.OK { t.Error("Unexpected reply returned", result) } + var reply []string + expected := []string{"Stats1"} tEv := &utils.CGREvent{ Tenant: "cgrates.org", ID: "event1", @@ -793,10 +797,10 @@ func testV1FIdxCaUpdateStatQueueProfile(t *testing.T) { utils.EventType: utils.BalanceUpdate, utils.Account: "1002", }} - if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &result); err != nil { + if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &reply); err != nil { t.Error(err) - } else if result != utils.OK { - t.Error("Unexpected reply returned", result) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) } fldNameVal := map[string]string{"TEST_PROFILE1": ""} @@ -860,11 +864,13 @@ func testV1FIdxCaUpdateStatQueueProfileFromTP(t *testing.T) { utils.EventType: utils.AccountUpdate, utils.Account: "1003", }} + var ids []string + expected := []string{"Stats1"} if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, - tEv, &result); err != nil { + tEv, &ids); err != nil { t.Error(err) - } else if result != utils.OK { - t.Error("Unexpected reply returned", result) + } else if !reflect.DeepEqual(ids, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, ids) } fldNameVal := map[string]string{"Stats1": ""} expectedRevIDX := map[string]utils.StringMap{ @@ -882,7 +888,8 @@ func testV1FIdxCaUpdateStatQueueProfileFromTP(t *testing.T) { } func testV1FIdxCaRemoveStatQueueProfile(t *testing.T) { - var result string + var reply []string + expected := []string{"TEST_PROFILE1"} tEv := &utils.CGREvent{ Tenant: "cgrates.org", ID: "event1", @@ -890,12 +897,12 @@ func testV1FIdxCaRemoveStatQueueProfile(t *testing.T) { utils.EventType: utils.BalanceUpdate, utils.Account: "1002", }} - if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &result); err != nil { + if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &reply); err != nil { t.Error(err) - } else if result != utils.OK { - t.Error("Unexpected reply returned", result) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) } - + expected = []string{"Stats1"} tEv2 := &utils.CGREvent{ Tenant: "cgrates.org", ID: "event1", @@ -903,11 +910,12 @@ func testV1FIdxCaRemoveStatQueueProfile(t *testing.T) { utils.EventType: utils.AccountUpdate, utils.Account: "1003", }} - if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv2, &result); err != nil { + if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv2, &reply); err != nil { t.Error(err) - } else if result != utils.OK { - t.Error("Unexpected reply returned", result) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) } + var result string //Remove threshold profile that was set form api if err := tFIdxCaRpc.Call("ApierV1.RemStatQueueProfile", &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &result); err != nil { @@ -936,24 +944,20 @@ func testV1FIdxCaRemoveStatQueueProfile(t *testing.T) { t.Error(err) } - if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &result); err == nil || + if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) - } else if result != utils.OK { - t.Error("Unexpected reply returned", result) } - if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv2, &result); err == nil || + if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv2, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) - } else if result != utils.OK { - t.Error("Unexpected reply returned", result) - } - fldNameVals := map[string]string{"THD_ACNT_BALANCE_1": "", "TEST_PROFILE1": ""} - if _, err = onStor.GetFilterReverseIndexes( - utils.PrefixToIndexCache[utils.ThresholdProfilePrefix], "cgrates.org", - fldNameVals); err == nil || err != utils.ErrNotFound { - t.Error(err) } + // fldNameVals := map[string]string{"THD_ACNT_BALANCE_1": "", "TEST_PROFILE1": ""} + // if _, err = onStor.GetFilterReverseIndexes( + // utils.PrefixToIndexCache[utils.ThresholdProfilePrefix], "cgrates.org", + // fldNameVals); err == nil || err != utils.ErrNotFound { + // t.Error(err) + // } } //AttributeProfile @@ -1569,22 +1573,21 @@ func testV1FIdxCaUpdateResourceProfile(t *testing.T) { } else if result != "MessageAllocation" { t.Error("Unexpected reply returned", result) } - /* - fldNameVal2 := map[string]string{"RCFG1": ""} - expectedRevIDX := map[string]utils.StringMap{ - "RCFG1": { - "*string:Account:2002": true, - "*string:Destination:2002": true, - "*string:Subject:2001": true}} - if indexes, err = onStor.GetFilterReverseIndexes( - utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], "cgrates.org", - fldNameVal2); err != nil { - t.Error(err) - } - if !reflect.DeepEqual(expectedRevIDX, indexes) { - t.Errorf("Expecting: %+v, received: %+v", expectedRevIDX, utils.ToJSON(indexes)) - } - */ + + // fldNameVal2 := map[string]string{"RCFG1": ""} + // expectedRevIDX := map[string]utils.StringMap{ + // "RCFG1": { + // "*string:Account:2002": true, + // "*string:Destination:2002": true, + // "*string:Subject:2001": true}} + // if indexes, err = onStor.GetFilterReverseIndexes( + // utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], "cgrates.org", + // fldNameVal2); err != nil { + // t.Error(err) + // } + // if !reflect.DeepEqual(expectedRevIDX, indexes) { + // t.Errorf("Expecting: %+v, received: %+v", expectedRevIDX, utils.ToJSON(indexes)) + // } } func testV1FIdxCaUpdateResourceProfileFromTP(t *testing.T) { diff --git a/apier/v1/sessions_thresholds_it_test.go b/apier/v1/sessions_thresholds_it_test.go index 7bd4eebc0..a81f9ffc2 100755 --- a/apier/v1/sessions_thresholds_it_test.go +++ b/apier/v1/sessions_thresholds_it_test.go @@ -51,6 +51,7 @@ func handleDisconnectSession2(clnt *rpc2.Client, } func TestSessionSv1ItInitCfg(t *testing.T) { + var err error sSv1CfgPath2 = path.Join(*dataDir, "conf", "samples", "sessions") // Init config first sSv1Cfg2, err = config.NewCGRConfigFromFolder(sSv1CfgPath2) diff --git a/apier/v1/sessionsv1_it_test.go b/apier/v1/sessionsv1_it_test.go index 8af7af406..3fe865dec 100644 --- a/apier/v1/sessionsv1_it_test.go +++ b/apier/v1/sessionsv1_it_test.go @@ -51,6 +51,7 @@ func handleDisconnectSession(clnt *rpc2.Client, } func TestSSv1ItInitCfg(t *testing.T) { + var err error sSv1CfgPath = path.Join(*dataDir, "conf", "samples", "sessions") // Init config first sSv1Cfg, err = config.NewCGRConfigFromFolder(sSv1CfgPath) diff --git a/apier/v1/stats.go b/apier/v1/stats.go index 6312a7752..ef4c8012a 100644 --- a/apier/v1/stats.go +++ b/apier/v1/stats.go @@ -96,7 +96,7 @@ func (stsv1 *StatSv1) GetQueueIDs(tenant string, qIDs *[]string) error { } // ProcessEvent returns processes a new Event -func (stsv1 *StatSv1) ProcessEvent(ev *utils.CGREvent, reply *string) error { +func (stsv1 *StatSv1) ProcessEvent(ev *utils.CGREvent, reply *[]string) error { return stsv1.sS.V1ProcessEvent(ev, reply) } diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go index b518b482e..cb6589893 100644 --- a/apier/v1/stats_it_test.go +++ b/apier/v1/stats_it_test.go @@ -112,7 +112,7 @@ func testV1STSLoadConfig(t *testing.T) { case "tutmongo": // Mongo needs more time to reset db, need to investigate statsDelay = 4000 default: - statsDelay = 1000 + statsDelay = 2000 } } @@ -142,7 +142,7 @@ func testV1STSFromFolder(t *testing.T) { if err := stsV1Rpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil { t.Error(err) } - time.Sleep(time.Duration(1000) * time.Millisecond) + time.Sleep(500 * time.Millisecond) } func testV1STSGetStats(t *testing.T) { @@ -173,7 +173,8 @@ func testV1STSGetStats(t *testing.T) { } func testV1STSProcessEvent(t *testing.T) { - var reply string + var reply []string + expected := []string{"Stats1"} ev1 := utils.CGREvent{ Tenant: "cgrates.org", ID: "event1", @@ -185,8 +186,8 @@ func testV1STSProcessEvent(t *testing.T) { utils.PDD: time.Duration(12 * time.Second)}} if err := stsV1Rpc.Call(utils.StatSv1ProcessEvent, &ev1, &reply); err != nil { t.Error(err) - } else if reply != utils.OK { - t.Errorf("received reply: %s", reply) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) } //process with one event (should be N/A becaus MinItems is 2) expectedMetrics := map[string]string{ @@ -200,7 +201,8 @@ func testV1STSProcessEvent(t *testing.T) { utils.MetaAverage: utils.NOT_AVAILABLE, } var metrics map[string]string - if err := stsV1Rpc.Call(utils.StatSv1GetQueueStringMetrics, &utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}, &metrics); err != nil { + if err := stsV1Rpc.Call(utils.StatSv1GetQueueStringMetrics, + &utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}, &metrics); err != nil { t.Error(err) } else if !reflect.DeepEqual(expectedMetrics, metrics) { t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics) @@ -215,8 +217,8 @@ func testV1STSProcessEvent(t *testing.T) { utils.Usage: time.Duration(45 * time.Second)}} if err := stsV1Rpc.Call(utils.StatSv1ProcessEvent, &ev2, &reply); err != nil { t.Error(err) - } else if reply != utils.OK { - t.Errorf("received reply: %s", reply) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) } ev3 := &utils.CGREvent{ Tenant: "cgrates.org", @@ -227,8 +229,8 @@ func testV1STSProcessEvent(t *testing.T) { utils.Usage: 0}} if err := stsV1Rpc.Call(utils.StatSv1ProcessEvent, &ev3, &reply); err != nil { t.Error(err) - } else if reply != utils.OK { - t.Errorf("received reply: %s", reply) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) } expectedMetrics2 := map[string]string{ utils.MetaASR: "66.66667%", @@ -259,7 +261,7 @@ func testV1STSGetStatsAfterRestart(t *testing.T) { if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } - time.Sleep(2 * time.Second) + time.Sleep(1 * time.Second) //get stats metrics after restart expectedMetrics2 := map[string]string{ @@ -278,7 +280,7 @@ func testV1STSGetStatsAfterRestart(t *testing.T) { } else if !reflect.DeepEqual(expectedMetrics2, metrics2) { t.Errorf("After restat expecting: %+v, received reply: %s", expectedMetrics2, metrics2) } - time.Sleep(time.Duration(1 * time.Second)) + time.Sleep(1 * time.Second) } func testV1STSSetStatQueueProfile(t *testing.T) { diff --git a/console/stats_process_event.go b/console/stats_process_event.go index 9c5c7c9ec..720879bdc 100644 --- a/console/stats_process_event.go +++ b/console/stats_process_event.go @@ -65,6 +65,6 @@ func (self *CmdStatQueueProcessEvent) PostprocessRpcParams() error { } func (self *CmdStatQueueProcessEvent) RpcResult() interface{} { - var atr string + var atr []string return &atr } diff --git a/engine/cdrs.go b/engine/cdrs.go index df76cc243..7822e505b 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -208,7 +208,7 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) { go self.cdrstats.Call("CDRStatsV1.AppendCDR", cdr, &out) } if self.stats != nil { - var reply string + var reply []string go self.stats.Call(utils.StatSv1ProcessEvent, cdr.AsCGREvent(), &reply) } if len(self.cgrCfg.CDRSOnlineCDRExports) != 0 { // Replicate raw CDR @@ -295,7 +295,7 @@ func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR, store, cdrstats, } } if self.stats != nil { - var reply string + var reply []string go self.stats.Call(utils.StatSv1ProcessEvent, ratedCDR.AsCGREvent(), &reply) } } diff --git a/engine/stats.go b/engine/stats.go index 1dc9832e7..494818fcf 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -214,16 +214,18 @@ func (ss *StatService) Call(serviceMethod string, args interface{}, reply interf // processEvent processes a new event, dispatching to matching queues // queues matching are also cached to speed up -func (sS *StatService) processEvent(ev *utils.CGREvent) (err error) { +func (sS *StatService) processEvent(ev *utils.CGREvent) (statQueueIDs []string, err error) { matchSQs, err := sS.matchingStatQueuesForEvent(ev) if err != nil { - return err + return nil, err } if len(matchSQs) == 0 { - return utils.ErrNotFound + return nil, utils.ErrNotFound } + var stsIDs []string var withErrors bool for _, sq := range matchSQs { + stsIDs = append(stsIDs, sq.ID) lkID := utils.StatQueuePrefix + sq.TenantID() guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lkID) err = sq.ProcessEvent(ev) @@ -269,6 +271,11 @@ func (sS *StatService) processEvent(ev *utils.CGREvent) (err error) { } } } + if len(stsIDs) != 0 { + statQueueIDs = append(statQueueIDs, stsIDs...) + } else { + statQueueIDs = []string{} + } if withErrors { err = utils.ErrPartiallyExecuted } @@ -276,14 +283,16 @@ func (sS *StatService) processEvent(ev *utils.CGREvent) (err error) { } // V1ProcessEvent implements StatV1 method for processing an Event -func (sS *StatService) V1ProcessEvent(ev *utils.CGREvent, reply *string) (err error) { +func (sS *StatService) V1ProcessEvent(ev *utils.CGREvent, reply *[]string) (err error) { if missing := utils.MissingStructFields(ev, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) } else if ev.Event == nil { return utils.NewErrMandatoryIeMissing("Event") } - if err = sS.processEvent(ev); err == nil { - *reply = utils.OK + if ids, err := sS.processEvent(ev); err != nil { + return err + } else { + *reply = ids } return } diff --git a/engine/stats_test.go b/engine/stats_test.go index 38db5358c..507933af8 100644 --- a/engine/stats_test.go +++ b/engine/stats_test.go @@ -314,42 +314,46 @@ func TestStatsmatchingStatQueuesForEvent(t *testing.T) { func TestStatSprocessEvent(t *testing.T) { stq := map[string]string{} - reply := "" + reply := []string{} + expected := []string{"statsprofile1"} err := stsserv.V1ProcessEvent(statsEvs[0], &reply) if err != nil { t.Errorf("Error: %+v", err) - } else if reply != utils.OK { - t.Errorf("received reply: %s", reply) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) } err = stsserv.V1GetQueueStringMetrics(&utils.TenantID{Tenant: stqs[0].Tenant, ID: stqs[0].ID}, &stq) if err != nil { t.Errorf("Error: %+v", err) } + expected = []string{"statsprofile2"} err = stsserv.V1ProcessEvent(statsEvs[1], &reply) if err != nil { t.Errorf("Error: %+v", err) - } else if reply != utils.OK { - t.Errorf("received reply: %s", reply) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) } err = stsserv.V1GetQueueStringMetrics(&utils.TenantID{Tenant: stqs[1].Tenant, ID: stqs[1].ID}, &stq) if err != nil { t.Errorf("Error: %+v", err) } + expected = []string{"statsprofile3"} err = stsserv.V1ProcessEvent(statsEvs[2], &reply) if err != nil { t.Errorf("Error: %+v", err) - } else if reply != utils.OK { - t.Errorf("received reply: %s", reply) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) } err = stsserv.V1GetQueueStringMetrics(&utils.TenantID{Tenant: stqs[2].Tenant, ID: stqs[2].ID}, &stq) if err != nil { t.Errorf("Error: %+v", err) } + expected = []string{"statsprofile4"} err = stsserv.V1ProcessEvent(statsEvs[3], &reply) if err != nil { t.Errorf("Error: %+v", err) - } else if reply != utils.OK { - t.Errorf("received reply: %s", reply) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) } err = stsserv.V1GetQueueStringMetrics(&utils.TenantID{Tenant: stqs[3].Tenant, ID: stqs[3].ID}, &stq) if err != nil { diff --git a/sessions/sessions.go b/sessions/sessions.go index 1b7c28c15..ffcff8d46 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -1322,6 +1322,7 @@ type V1AuthorizeReply struct { MaxUsage *time.Duration Suppliers *engine.SortedSuppliers ThresholdIDs *[]string + StatQueueIDs *[]string } // AsCGRReply is part of utils.CGRReplier interface @@ -1346,7 +1347,10 @@ func (v1AuthReply *V1AuthorizeReply) AsCGRReply() (cgrReply utils.CGRReply, err cgrReply[utils.CapSuppliers] = v1AuthReply.Suppliers.Digest() } if v1AuthReply.ThresholdIDs != nil { - cgrReply[utils.CapThresholdHits] = *v1AuthReply.ThresholdIDs + cgrReply[utils.CapThresholds] = *v1AuthReply.ThresholdIDs + } + if v1AuthReply.StatQueueIDs != nil { + cgrReply[utils.CapStatQueues] = *v1AuthReply.StatQueueIDs } cgrReply[utils.Error] = "" // so we can compare in filters return @@ -1461,12 +1465,13 @@ func (smg *SMGeneric) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection, if smg.statS == nil { return utils.NewErrNotConnected(utils.StatService) } - var statReply string + var statReply []string if err := smg.statS.Call(utils.StatSv1ProcessEvent, &args.CGREvent, &statReply); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( fmt.Sprintf(" error: %s processing event %+v with StatS.", err.Error(), args.CGREvent)) } + authReply.StatQueueIDs = &statReply } return nil } @@ -1477,6 +1482,7 @@ type V1AuthorizeReplyWithDigest struct { MaxUsage *float64 // special treat returning time.Duration.Seconds() SuppliersDigest *string Thresholds *string + StatQueues *string } // BiRPCv1AuthorizeEventWithDigest performs authorization for CGREvent based on specific components @@ -1506,10 +1512,14 @@ func (smg *SMGeneric) BiRPCv1AuthorizeEventWithDigest(clnt rpcclient.RpcClientCo if args.GetSuppliers { authReply.SuppliersDigest = utils.StringPointer(initAuthRply.Suppliers.Digest()) } - if *args.ProcessThresholds { + if args.ProcessThresholds != nil && *args.ProcessThresholds { authReply.Thresholds = utils.StringPointer( strings.Join(*initAuthRply.ThresholdIDs, utils.FIELDS_SEP)) } + if args.ProcessStatQueues != nil && *args.ProcessStatQueues { + authReply.StatQueues = utils.StringPointer( + strings.Join(*initAuthRply.StatQueueIDs, utils.FIELDS_SEP)) + } return nil } @@ -1527,6 +1537,7 @@ type V1InitSessionReply struct { ResourceAllocation *string MaxUsage *time.Duration ThresholdIDs *[]string + StatQueueIDs *[]string } // AsCGRReply is part of utils.CGRReplier interface @@ -1548,7 +1559,10 @@ func (v1Rply *V1InitSessionReply) AsCGRReply() (cgrReply utils.CGRReply, err err cgrReply[utils.CapMaxUsage] = *v1Rply.MaxUsage } if v1Rply.ThresholdIDs != nil { - cgrReply[utils.CapThresholdHits] = *v1Rply.ThresholdIDs + cgrReply[utils.CapThresholds] = *v1Rply.ThresholdIDs + } + if v1Rply.StatQueueIDs != nil { + cgrReply[utils.CapStatQueues] = *v1Rply.StatQueueIDs } cgrReply[utils.Error] = "" return @@ -1639,12 +1653,13 @@ func (smg *SMGeneric) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, if smg.statS == nil { return utils.NewErrNotConnected(utils.StatService) } - var statReply string + var statReply []string if err := smg.statS.Call(utils.StatSv1ProcessEvent, &args.CGREvent, &statReply); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( fmt.Sprintf(" error: %s processing event %+v with StatS.", err.Error(), args.CGREvent)) } + rply.StatQueueIDs = &statReply } return } @@ -1654,6 +1669,7 @@ type V1InitReplyWithDigest struct { ResourceAllocation *string MaxUsage *float64 Thresholds *string + StatQueues *string } func (smg *SMGeneric) BiRPCv1InitiateSessionWithDigest(clnt rpcclient.RpcClientConnection, @@ -1682,10 +1698,14 @@ func (smg *SMGeneric) BiRPCv1InitiateSessionWithDigest(clnt rpcclient.RpcClientC } } - if *args.ProcessThresholds { + if args.ProcessThresholds != nil && *args.ProcessThresholds { initReply.Thresholds = utils.StringPointer( strings.Join(*initSessionRply.ThresholdIDs, utils.FIELDS_SEP)) } + if args.ProcessStatQueues != nil && *args.ProcessStatQueues { + initReply.StatQueues = utils.StringPointer( + strings.Join(*initSessionRply.StatQueueIDs, utils.FIELDS_SEP)) + } return nil } @@ -1833,7 +1853,7 @@ func (smg *SMGeneric) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection if smg.statS == nil { return utils.NewErrNotConnected(utils.StatService) } - var statReply string + var statReply []string if err := smg.statS.Call(utils.StatSv1ProcessEvent, &args.CGREvent, &statReply); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( diff --git a/utils/consts.go b/utils/consts.go index d57bf32eb..0bde08874 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -614,7 +614,8 @@ const ( CapResourceAllocation = "ResourceAllocation" CapMaxUsage = "MaxUsage" CapSuppliers = "Suppliers" - CapThresholdHits = "ThresholdHits" + CapThresholds = "Thresholds" + CapStatQueues = "StatQueues" ) // MetaFilterIndexesAPIs