mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-12 02:26:26 +05:00
Stats Process Event return now StatQueueIDs
This commit is contained in:
committed by
Dan Christian Bogos
parent
b8472ac71d
commit
dbff560dfb
@@ -273,6 +273,9 @@ func (kev KamEvent) AsKamAuthReply(authArgs *sessions.V1AuthorizeArgs,
|
||||
if authArgs.ProcessThresholds != nil && *authArgs.ProcessThresholds {
|
||||
kar.Thresholds = strings.Join(*authReply.ThresholdIDs, utils.FIELDS_SEP)
|
||||
}
|
||||
if authArgs.ProcessStatQueues != nil && *authArgs.ProcessStatQueues {
|
||||
kar.StatQueues = strings.Join(*authReply.StatQueueIDs, utils.FIELDS_SEP)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -346,6 +349,7 @@ type KamAuthReply struct {
|
||||
MaxUsage int // Maximum session time in case of success, -1 for unlimited
|
||||
Suppliers string // List of suppliers, comma separated
|
||||
Thresholds string
|
||||
StatQueues string
|
||||
Error string // Reply in case of error
|
||||
}
|
||||
|
||||
|
||||
@@ -559,7 +559,7 @@ func testV1FIdxCaRemoveThresholdProfile(t *testing.T) {
|
||||
|
||||
//StatQueue
|
||||
func testV1FIdxCaGetStatQueuesWithNotFound(t *testing.T) {
|
||||
var reply *string
|
||||
var reply *[]string
|
||||
tEv := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event1",
|
||||
@@ -571,8 +571,6 @@ func testV1FIdxCaGetStatQueuesWithNotFound(t *testing.T) {
|
||||
if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &reply); err == nil ||
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
} else if reply != nil && *reply != "" {
|
||||
t.Error("Unexpected reply returned", *reply)
|
||||
}
|
||||
if indexes, err = onStor.GetFilterReverseIndexes(
|
||||
utils.PrefixToIndexCache[utils.StatQueueProfilePrefix], "cgrates.org",
|
||||
@@ -603,6 +601,7 @@ func testV1FIdxCaSetStatQueueProfile(t *testing.T) {
|
||||
},
|
||||
}
|
||||
var result string
|
||||
|
||||
if err := tFIdxCaRpc.Call("ApierV1.SetFilter", filter, &result); err != nil {
|
||||
t.Error(err)
|
||||
} else if result != utils.OK {
|
||||
@@ -643,11 +642,13 @@ func testV1FIdxCaSetStatQueueProfile(t *testing.T) {
|
||||
utils.Account: "1001",
|
||||
},
|
||||
}
|
||||
var reply []string
|
||||
expected := []string{"TEST_PROFILE1"}
|
||||
if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent,
|
||||
tEv, &result); err != nil {
|
||||
tEv, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if result != utils.OK {
|
||||
t.Error("Unexpected reply returned", result)
|
||||
} else if !reflect.DeepEqual(reply, expected) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
|
||||
}
|
||||
|
||||
fldNameVal := map[string]string{"TEST_PROFILE1": ""}
|
||||
@@ -665,7 +666,8 @@ func testV1FIdxCaSetStatQueueProfile(t *testing.T) {
|
||||
}
|
||||
|
||||
func testV1FIdxCaGetStatQueuesFromTP(t *testing.T) {
|
||||
var reply string
|
||||
var reply []string
|
||||
expected := []string{"Stats1"}
|
||||
ev2 := utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event2",
|
||||
@@ -675,8 +677,8 @@ func testV1FIdxCaGetStatQueuesFromTP(t *testing.T) {
|
||||
utils.Usage: time.Duration(45 * time.Second)}}
|
||||
if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, &ev2, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("received reply: %s", reply)
|
||||
} else if !reflect.DeepEqual(reply, expected) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
|
||||
}
|
||||
ev3 := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
@@ -687,8 +689,8 @@ func testV1FIdxCaGetStatQueuesFromTP(t *testing.T) {
|
||||
utils.Usage: 0}}
|
||||
if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, &ev3, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("received reply: %s", reply)
|
||||
} else if !reflect.DeepEqual(reply, expected) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
|
||||
}
|
||||
|
||||
tEv := &utils.CGREvent{
|
||||
@@ -701,8 +703,8 @@ func testV1FIdxCaGetStatQueuesFromTP(t *testing.T) {
|
||||
}}
|
||||
if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, &tEv, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("received reply: %s", reply)
|
||||
} else if !reflect.DeepEqual(reply, expected) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
|
||||
}
|
||||
tEv2 := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
@@ -714,8 +716,8 @@ func testV1FIdxCaGetStatQueuesFromTP(t *testing.T) {
|
||||
}}
|
||||
if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, &tEv2, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("received reply: %s", reply)
|
||||
} else if !reflect.DeepEqual(reply, expected) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
|
||||
}
|
||||
|
||||
idx := map[string]utils.StringMap{
|
||||
@@ -786,6 +788,8 @@ func testV1FIdxCaUpdateStatQueueProfile(t *testing.T) {
|
||||
} else if result != utils.OK {
|
||||
t.Error("Unexpected reply returned", result)
|
||||
}
|
||||
var reply []string
|
||||
expected := []string{"Stats1"}
|
||||
tEv := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event1",
|
||||
@@ -793,10 +797,10 @@ func testV1FIdxCaUpdateStatQueueProfile(t *testing.T) {
|
||||
utils.EventType: utils.BalanceUpdate,
|
||||
utils.Account: "1002",
|
||||
}}
|
||||
if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &result); err != nil {
|
||||
if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if result != utils.OK {
|
||||
t.Error("Unexpected reply returned", result)
|
||||
} else if !reflect.DeepEqual(reply, expected) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
|
||||
}
|
||||
|
||||
fldNameVal := map[string]string{"TEST_PROFILE1": ""}
|
||||
@@ -860,11 +864,13 @@ func testV1FIdxCaUpdateStatQueueProfileFromTP(t *testing.T) {
|
||||
utils.EventType: utils.AccountUpdate,
|
||||
utils.Account: "1003",
|
||||
}}
|
||||
var ids []string
|
||||
expected := []string{"Stats1"}
|
||||
if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent,
|
||||
tEv, &result); err != nil {
|
||||
tEv, &ids); err != nil {
|
||||
t.Error(err)
|
||||
} else if result != utils.OK {
|
||||
t.Error("Unexpected reply returned", result)
|
||||
} else if !reflect.DeepEqual(ids, expected) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expected, ids)
|
||||
}
|
||||
fldNameVal := map[string]string{"Stats1": ""}
|
||||
expectedRevIDX := map[string]utils.StringMap{
|
||||
@@ -882,7 +888,8 @@ func testV1FIdxCaUpdateStatQueueProfileFromTP(t *testing.T) {
|
||||
}
|
||||
|
||||
func testV1FIdxCaRemoveStatQueueProfile(t *testing.T) {
|
||||
var result string
|
||||
var reply []string
|
||||
expected := []string{"TEST_PROFILE1"}
|
||||
tEv := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event1",
|
||||
@@ -890,12 +897,12 @@ func testV1FIdxCaRemoveStatQueueProfile(t *testing.T) {
|
||||
utils.EventType: utils.BalanceUpdate,
|
||||
utils.Account: "1002",
|
||||
}}
|
||||
if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &result); err != nil {
|
||||
if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if result != utils.OK {
|
||||
t.Error("Unexpected reply returned", result)
|
||||
} else if !reflect.DeepEqual(reply, expected) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
|
||||
}
|
||||
|
||||
expected = []string{"Stats1"}
|
||||
tEv2 := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event1",
|
||||
@@ -903,11 +910,12 @@ func testV1FIdxCaRemoveStatQueueProfile(t *testing.T) {
|
||||
utils.EventType: utils.AccountUpdate,
|
||||
utils.Account: "1003",
|
||||
}}
|
||||
if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv2, &result); err != nil {
|
||||
if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv2, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if result != utils.OK {
|
||||
t.Error("Unexpected reply returned", result)
|
||||
} else if !reflect.DeepEqual(reply, expected) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
|
||||
}
|
||||
var result string
|
||||
//Remove threshold profile that was set form api
|
||||
if err := tFIdxCaRpc.Call("ApierV1.RemStatQueueProfile",
|
||||
&utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &result); err != nil {
|
||||
@@ -936,24 +944,20 @@ func testV1FIdxCaRemoveStatQueueProfile(t *testing.T) {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &result); err == nil ||
|
||||
if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &reply); err == nil ||
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
} else if result != utils.OK {
|
||||
t.Error("Unexpected reply returned", result)
|
||||
}
|
||||
if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv2, &result); err == nil ||
|
||||
if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv2, &reply); err == nil ||
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
} else if result != utils.OK {
|
||||
t.Error("Unexpected reply returned", result)
|
||||
}
|
||||
fldNameVals := map[string]string{"THD_ACNT_BALANCE_1": "", "TEST_PROFILE1": ""}
|
||||
if _, err = onStor.GetFilterReverseIndexes(
|
||||
utils.PrefixToIndexCache[utils.ThresholdProfilePrefix], "cgrates.org",
|
||||
fldNameVals); err == nil || err != utils.ErrNotFound {
|
||||
t.Error(err)
|
||||
}
|
||||
// fldNameVals := map[string]string{"THD_ACNT_BALANCE_1": "", "TEST_PROFILE1": ""}
|
||||
// if _, err = onStor.GetFilterReverseIndexes(
|
||||
// utils.PrefixToIndexCache[utils.ThresholdProfilePrefix], "cgrates.org",
|
||||
// fldNameVals); err == nil || err != utils.ErrNotFound {
|
||||
// t.Error(err)
|
||||
// }
|
||||
}
|
||||
|
||||
//AttributeProfile
|
||||
@@ -1569,22 +1573,21 @@ func testV1FIdxCaUpdateResourceProfile(t *testing.T) {
|
||||
} else if result != "MessageAllocation" {
|
||||
t.Error("Unexpected reply returned", result)
|
||||
}
|
||||
/*
|
||||
fldNameVal2 := map[string]string{"RCFG1": ""}
|
||||
expectedRevIDX := map[string]utils.StringMap{
|
||||
"RCFG1": {
|
||||
"*string:Account:2002": true,
|
||||
"*string:Destination:2002": true,
|
||||
"*string:Subject:2001": true}}
|
||||
if indexes, err = onStor.GetFilterReverseIndexes(
|
||||
utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], "cgrates.org",
|
||||
fldNameVal2); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if !reflect.DeepEqual(expectedRevIDX, indexes) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expectedRevIDX, utils.ToJSON(indexes))
|
||||
}
|
||||
*/
|
||||
|
||||
// fldNameVal2 := map[string]string{"RCFG1": ""}
|
||||
// expectedRevIDX := map[string]utils.StringMap{
|
||||
// "RCFG1": {
|
||||
// "*string:Account:2002": true,
|
||||
// "*string:Destination:2002": true,
|
||||
// "*string:Subject:2001": true}}
|
||||
// if indexes, err = onStor.GetFilterReverseIndexes(
|
||||
// utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], "cgrates.org",
|
||||
// fldNameVal2); err != nil {
|
||||
// t.Error(err)
|
||||
// }
|
||||
// if !reflect.DeepEqual(expectedRevIDX, indexes) {
|
||||
// t.Errorf("Expecting: %+v, received: %+v", expectedRevIDX, utils.ToJSON(indexes))
|
||||
// }
|
||||
}
|
||||
|
||||
func testV1FIdxCaUpdateResourceProfileFromTP(t *testing.T) {
|
||||
|
||||
@@ -51,6 +51,7 @@ func handleDisconnectSession2(clnt *rpc2.Client,
|
||||
}
|
||||
|
||||
func TestSessionSv1ItInitCfg(t *testing.T) {
|
||||
var err error
|
||||
sSv1CfgPath2 = path.Join(*dataDir, "conf", "samples", "sessions")
|
||||
// Init config first
|
||||
sSv1Cfg2, err = config.NewCGRConfigFromFolder(sSv1CfgPath2)
|
||||
|
||||
@@ -51,6 +51,7 @@ func handleDisconnectSession(clnt *rpc2.Client,
|
||||
}
|
||||
|
||||
func TestSSv1ItInitCfg(t *testing.T) {
|
||||
var err error
|
||||
sSv1CfgPath = path.Join(*dataDir, "conf", "samples", "sessions")
|
||||
// Init config first
|
||||
sSv1Cfg, err = config.NewCGRConfigFromFolder(sSv1CfgPath)
|
||||
|
||||
@@ -96,7 +96,7 @@ func (stsv1 *StatSv1) GetQueueIDs(tenant string, qIDs *[]string) error {
|
||||
}
|
||||
|
||||
// ProcessEvent returns processes a new Event
|
||||
func (stsv1 *StatSv1) ProcessEvent(ev *utils.CGREvent, reply *string) error {
|
||||
func (stsv1 *StatSv1) ProcessEvent(ev *utils.CGREvent, reply *[]string) error {
|
||||
return stsv1.sS.V1ProcessEvent(ev, reply)
|
||||
}
|
||||
|
||||
|
||||
@@ -112,7 +112,7 @@ func testV1STSLoadConfig(t *testing.T) {
|
||||
case "tutmongo": // Mongo needs more time to reset db, need to investigate
|
||||
statsDelay = 4000
|
||||
default:
|
||||
statsDelay = 1000
|
||||
statsDelay = 2000
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,7 +142,7 @@ func testV1STSFromFolder(t *testing.T) {
|
||||
if err := stsV1Rpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
time.Sleep(time.Duration(1000) * time.Millisecond)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
|
||||
func testV1STSGetStats(t *testing.T) {
|
||||
@@ -173,7 +173,8 @@ func testV1STSGetStats(t *testing.T) {
|
||||
}
|
||||
|
||||
func testV1STSProcessEvent(t *testing.T) {
|
||||
var reply string
|
||||
var reply []string
|
||||
expected := []string{"Stats1"}
|
||||
ev1 := utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event1",
|
||||
@@ -185,8 +186,8 @@ func testV1STSProcessEvent(t *testing.T) {
|
||||
utils.PDD: time.Duration(12 * time.Second)}}
|
||||
if err := stsV1Rpc.Call(utils.StatSv1ProcessEvent, &ev1, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("received reply: %s", reply)
|
||||
} else if !reflect.DeepEqual(reply, expected) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
|
||||
}
|
||||
//process with one event (should be N/A becaus MinItems is 2)
|
||||
expectedMetrics := map[string]string{
|
||||
@@ -200,7 +201,8 @@ func testV1STSProcessEvent(t *testing.T) {
|
||||
utils.MetaAverage: utils.NOT_AVAILABLE,
|
||||
}
|
||||
var metrics map[string]string
|
||||
if err := stsV1Rpc.Call(utils.StatSv1GetQueueStringMetrics, &utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}, &metrics); err != nil {
|
||||
if err := stsV1Rpc.Call(utils.StatSv1GetQueueStringMetrics,
|
||||
&utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}, &metrics); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(expectedMetrics, metrics) {
|
||||
t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics)
|
||||
@@ -215,8 +217,8 @@ func testV1STSProcessEvent(t *testing.T) {
|
||||
utils.Usage: time.Duration(45 * time.Second)}}
|
||||
if err := stsV1Rpc.Call(utils.StatSv1ProcessEvent, &ev2, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("received reply: %s", reply)
|
||||
} else if !reflect.DeepEqual(reply, expected) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
|
||||
}
|
||||
ev3 := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
@@ -227,8 +229,8 @@ func testV1STSProcessEvent(t *testing.T) {
|
||||
utils.Usage: 0}}
|
||||
if err := stsV1Rpc.Call(utils.StatSv1ProcessEvent, &ev3, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("received reply: %s", reply)
|
||||
} else if !reflect.DeepEqual(reply, expected) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
|
||||
}
|
||||
expectedMetrics2 := map[string]string{
|
||||
utils.MetaASR: "66.66667%",
|
||||
@@ -259,7 +261,7 @@ func testV1STSGetStatsAfterRestart(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal("Could not connect to rater: ", err.Error())
|
||||
}
|
||||
time.Sleep(2 * time.Second)
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
//get stats metrics after restart
|
||||
expectedMetrics2 := map[string]string{
|
||||
@@ -278,7 +280,7 @@ func testV1STSGetStatsAfterRestart(t *testing.T) {
|
||||
} else if !reflect.DeepEqual(expectedMetrics2, metrics2) {
|
||||
t.Errorf("After restat expecting: %+v, received reply: %s", expectedMetrics2, metrics2)
|
||||
}
|
||||
time.Sleep(time.Duration(1 * time.Second))
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
func testV1STSSetStatQueueProfile(t *testing.T) {
|
||||
|
||||
@@ -65,6 +65,6 @@ func (self *CmdStatQueueProcessEvent) PostprocessRpcParams() error {
|
||||
}
|
||||
|
||||
func (self *CmdStatQueueProcessEvent) RpcResult() interface{} {
|
||||
var atr string
|
||||
var atr []string
|
||||
return &atr
|
||||
}
|
||||
|
||||
@@ -208,7 +208,7 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) {
|
||||
go self.cdrstats.Call("CDRStatsV1.AppendCDR", cdr, &out)
|
||||
}
|
||||
if self.stats != nil {
|
||||
var reply string
|
||||
var reply []string
|
||||
go self.stats.Call(utils.StatSv1ProcessEvent, cdr.AsCGREvent(), &reply)
|
||||
}
|
||||
if len(self.cgrCfg.CDRSOnlineCDRExports) != 0 { // Replicate raw CDR
|
||||
@@ -295,7 +295,7 @@ func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR, store, cdrstats,
|
||||
}
|
||||
}
|
||||
if self.stats != nil {
|
||||
var reply string
|
||||
var reply []string
|
||||
go self.stats.Call(utils.StatSv1ProcessEvent, ratedCDR.AsCGREvent(), &reply)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -214,16 +214,18 @@ func (ss *StatService) Call(serviceMethod string, args interface{}, reply interf
|
||||
|
||||
// processEvent processes a new event, dispatching to matching queues
|
||||
// queues matching are also cached to speed up
|
||||
func (sS *StatService) processEvent(ev *utils.CGREvent) (err error) {
|
||||
func (sS *StatService) processEvent(ev *utils.CGREvent) (statQueueIDs []string, err error) {
|
||||
matchSQs, err := sS.matchingStatQueuesForEvent(ev)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
if len(matchSQs) == 0 {
|
||||
return utils.ErrNotFound
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
var stsIDs []string
|
||||
var withErrors bool
|
||||
for _, sq := range matchSQs {
|
||||
stsIDs = append(stsIDs, sq.ID)
|
||||
lkID := utils.StatQueuePrefix + sq.TenantID()
|
||||
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lkID)
|
||||
err = sq.ProcessEvent(ev)
|
||||
@@ -269,6 +271,11 @@ func (sS *StatService) processEvent(ev *utils.CGREvent) (err error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(stsIDs) != 0 {
|
||||
statQueueIDs = append(statQueueIDs, stsIDs...)
|
||||
} else {
|
||||
statQueueIDs = []string{}
|
||||
}
|
||||
if withErrors {
|
||||
err = utils.ErrPartiallyExecuted
|
||||
}
|
||||
@@ -276,14 +283,16 @@ func (sS *StatService) processEvent(ev *utils.CGREvent) (err error) {
|
||||
}
|
||||
|
||||
// V1ProcessEvent implements StatV1 method for processing an Event
|
||||
func (sS *StatService) V1ProcessEvent(ev *utils.CGREvent, reply *string) (err error) {
|
||||
func (sS *StatService) V1ProcessEvent(ev *utils.CGREvent, reply *[]string) (err error) {
|
||||
if missing := utils.MissingStructFields(ev, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
} else if ev.Event == nil {
|
||||
return utils.NewErrMandatoryIeMissing("Event")
|
||||
}
|
||||
if err = sS.processEvent(ev); err == nil {
|
||||
*reply = utils.OK
|
||||
if ids, err := sS.processEvent(ev); err != nil {
|
||||
return err
|
||||
} else {
|
||||
*reply = ids
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -314,42 +314,46 @@ func TestStatsmatchingStatQueuesForEvent(t *testing.T) {
|
||||
|
||||
func TestStatSprocessEvent(t *testing.T) {
|
||||
stq := map[string]string{}
|
||||
reply := ""
|
||||
reply := []string{}
|
||||
expected := []string{"statsprofile1"}
|
||||
err := stsserv.V1ProcessEvent(statsEvs[0], &reply)
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("received reply: %s", reply)
|
||||
} else if !reflect.DeepEqual(reply, expected) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
|
||||
}
|
||||
err = stsserv.V1GetQueueStringMetrics(&utils.TenantID{Tenant: stqs[0].Tenant, ID: stqs[0].ID}, &stq)
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
}
|
||||
expected = []string{"statsprofile2"}
|
||||
err = stsserv.V1ProcessEvent(statsEvs[1], &reply)
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("received reply: %s", reply)
|
||||
} else if !reflect.DeepEqual(reply, expected) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
|
||||
}
|
||||
err = stsserv.V1GetQueueStringMetrics(&utils.TenantID{Tenant: stqs[1].Tenant, ID: stqs[1].ID}, &stq)
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
}
|
||||
expected = []string{"statsprofile3"}
|
||||
err = stsserv.V1ProcessEvent(statsEvs[2], &reply)
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("received reply: %s", reply)
|
||||
} else if !reflect.DeepEqual(reply, expected) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
|
||||
}
|
||||
err = stsserv.V1GetQueueStringMetrics(&utils.TenantID{Tenant: stqs[2].Tenant, ID: stqs[2].ID}, &stq)
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
}
|
||||
expected = []string{"statsprofile4"}
|
||||
err = stsserv.V1ProcessEvent(statsEvs[3], &reply)
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("received reply: %s", reply)
|
||||
} else if !reflect.DeepEqual(reply, expected) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
|
||||
}
|
||||
err = stsserv.V1GetQueueStringMetrics(&utils.TenantID{Tenant: stqs[3].Tenant, ID: stqs[3].ID}, &stq)
|
||||
if err != nil {
|
||||
|
||||
@@ -1322,6 +1322,7 @@ type V1AuthorizeReply struct {
|
||||
MaxUsage *time.Duration
|
||||
Suppliers *engine.SortedSuppliers
|
||||
ThresholdIDs *[]string
|
||||
StatQueueIDs *[]string
|
||||
}
|
||||
|
||||
// AsCGRReply is part of utils.CGRReplier interface
|
||||
@@ -1346,7 +1347,10 @@ func (v1AuthReply *V1AuthorizeReply) AsCGRReply() (cgrReply utils.CGRReply, err
|
||||
cgrReply[utils.CapSuppliers] = v1AuthReply.Suppliers.Digest()
|
||||
}
|
||||
if v1AuthReply.ThresholdIDs != nil {
|
||||
cgrReply[utils.CapThresholdHits] = *v1AuthReply.ThresholdIDs
|
||||
cgrReply[utils.CapThresholds] = *v1AuthReply.ThresholdIDs
|
||||
}
|
||||
if v1AuthReply.StatQueueIDs != nil {
|
||||
cgrReply[utils.CapStatQueues] = *v1AuthReply.StatQueueIDs
|
||||
}
|
||||
cgrReply[utils.Error] = "" // so we can compare in filters
|
||||
return
|
||||
@@ -1461,12 +1465,13 @@ func (smg *SMGeneric) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection,
|
||||
if smg.statS == nil {
|
||||
return utils.NewErrNotConnected(utils.StatService)
|
||||
}
|
||||
var statReply string
|
||||
var statReply []string
|
||||
if err := smg.statS.Call(utils.StatSv1ProcessEvent, &args.CGREvent, &statReply); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<SessionS> error: %s processing event %+v with StatS.", err.Error(), args.CGREvent))
|
||||
}
|
||||
authReply.StatQueueIDs = &statReply
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1477,6 +1482,7 @@ type V1AuthorizeReplyWithDigest struct {
|
||||
MaxUsage *float64 // special treat returning time.Duration.Seconds()
|
||||
SuppliersDigest *string
|
||||
Thresholds *string
|
||||
StatQueues *string
|
||||
}
|
||||
|
||||
// BiRPCv1AuthorizeEventWithDigest performs authorization for CGREvent based on specific components
|
||||
@@ -1506,10 +1512,14 @@ func (smg *SMGeneric) BiRPCv1AuthorizeEventWithDigest(clnt rpcclient.RpcClientCo
|
||||
if args.GetSuppliers {
|
||||
authReply.SuppliersDigest = utils.StringPointer(initAuthRply.Suppliers.Digest())
|
||||
}
|
||||
if *args.ProcessThresholds {
|
||||
if args.ProcessThresholds != nil && *args.ProcessThresholds {
|
||||
authReply.Thresholds = utils.StringPointer(
|
||||
strings.Join(*initAuthRply.ThresholdIDs, utils.FIELDS_SEP))
|
||||
}
|
||||
if args.ProcessStatQueues != nil && *args.ProcessStatQueues {
|
||||
authReply.StatQueues = utils.StringPointer(
|
||||
strings.Join(*initAuthRply.StatQueueIDs, utils.FIELDS_SEP))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1527,6 +1537,7 @@ type V1InitSessionReply struct {
|
||||
ResourceAllocation *string
|
||||
MaxUsage *time.Duration
|
||||
ThresholdIDs *[]string
|
||||
StatQueueIDs *[]string
|
||||
}
|
||||
|
||||
// AsCGRReply is part of utils.CGRReplier interface
|
||||
@@ -1548,7 +1559,10 @@ func (v1Rply *V1InitSessionReply) AsCGRReply() (cgrReply utils.CGRReply, err err
|
||||
cgrReply[utils.CapMaxUsage] = *v1Rply.MaxUsage
|
||||
}
|
||||
if v1Rply.ThresholdIDs != nil {
|
||||
cgrReply[utils.CapThresholdHits] = *v1Rply.ThresholdIDs
|
||||
cgrReply[utils.CapThresholds] = *v1Rply.ThresholdIDs
|
||||
}
|
||||
if v1Rply.StatQueueIDs != nil {
|
||||
cgrReply[utils.CapStatQueues] = *v1Rply.StatQueueIDs
|
||||
}
|
||||
cgrReply[utils.Error] = ""
|
||||
return
|
||||
@@ -1639,12 +1653,13 @@ func (smg *SMGeneric) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection,
|
||||
if smg.statS == nil {
|
||||
return utils.NewErrNotConnected(utils.StatService)
|
||||
}
|
||||
var statReply string
|
||||
var statReply []string
|
||||
if err := smg.statS.Call(utils.StatSv1ProcessEvent, &args.CGREvent, &statReply); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<SessionS> error: %s processing event %+v with StatS.", err.Error(), args.CGREvent))
|
||||
}
|
||||
rply.StatQueueIDs = &statReply
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -1654,6 +1669,7 @@ type V1InitReplyWithDigest struct {
|
||||
ResourceAllocation *string
|
||||
MaxUsage *float64
|
||||
Thresholds *string
|
||||
StatQueues *string
|
||||
}
|
||||
|
||||
func (smg *SMGeneric) BiRPCv1InitiateSessionWithDigest(clnt rpcclient.RpcClientConnection,
|
||||
@@ -1682,10 +1698,14 @@ func (smg *SMGeneric) BiRPCv1InitiateSessionWithDigest(clnt rpcclient.RpcClientC
|
||||
}
|
||||
}
|
||||
|
||||
if *args.ProcessThresholds {
|
||||
if args.ProcessThresholds != nil && *args.ProcessThresholds {
|
||||
initReply.Thresholds = utils.StringPointer(
|
||||
strings.Join(*initSessionRply.ThresholdIDs, utils.FIELDS_SEP))
|
||||
}
|
||||
if args.ProcessStatQueues != nil && *args.ProcessStatQueues {
|
||||
initReply.StatQueues = utils.StringPointer(
|
||||
strings.Join(*initSessionRply.StatQueueIDs, utils.FIELDS_SEP))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1833,7 +1853,7 @@ func (smg *SMGeneric) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection
|
||||
if smg.statS == nil {
|
||||
return utils.NewErrNotConnected(utils.StatService)
|
||||
}
|
||||
var statReply string
|
||||
var statReply []string
|
||||
if err := smg.statS.Call(utils.StatSv1ProcessEvent, &args.CGREvent, &statReply); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
|
||||
@@ -614,7 +614,8 @@ const (
|
||||
CapResourceAllocation = "ResourceAllocation"
|
||||
CapMaxUsage = "MaxUsage"
|
||||
CapSuppliers = "Suppliers"
|
||||
CapThresholdHits = "ThresholdHits"
|
||||
CapThresholds = "Thresholds"
|
||||
CapStatQueues = "StatQueues"
|
||||
)
|
||||
|
||||
// MetaFilterIndexesAPIs
|
||||
|
||||
Reference in New Issue
Block a user