From 0bdbe2df4da70144941ca5edf80edcb746481cdc Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 14 Jul 2021 12:49:57 +0300 Subject: [PATCH] Updated caps birpc server --- cores/caps.go | 3 + .../dispatchers/all_mysql/cgrates.json | 2 +- data/conf/samples/ees/cgrates.json | 2 +- dispatchers/attributes_it_test.go | 8 +- dispatchers/sessions_it_test.go | 8 +- ees/nats_it_test.go | 8 +- engine/libstats.go | 79 ++++++++++++++----- engine/libstats_test.go | 25 +++++- general_tests/filters_it_test.go | 2 +- 9 files changed, 103 insertions(+), 34 deletions(-) diff --git a/cores/caps.go b/cores/caps.go index 78bbe90ef..ee4534ced 100644 --- a/cores/caps.go +++ b/cores/caps.go @@ -144,6 +144,9 @@ func newCapsBiRPCJSONCodec(conn conn, caps *engine.Caps, anz *analyzers.Analyzer } func newCapsBiRPCCodec(sc birpc.BirpcCodec, caps *engine.Caps) birpc.BirpcCodec { + if !caps.IsLimited() { + return sc + } return &capsBiRPCCodec{ sc: sc, caps: caps, diff --git a/data/conf/samples/dispatchers/all_mysql/cgrates.json b/data/conf/samples/dispatchers/all_mysql/cgrates.json index c9fbcc18c..7ade44aa9 100644 --- a/data/conf/samples/dispatchers/all_mysql/cgrates.json +++ b/data/conf/samples/dispatchers/all_mysql/cgrates.json @@ -10,7 +10,7 @@ "listen": { - "rpc_json": ":6012", + "rpc_json": ":6012", "rpc_gob": ":6013", "http": ":6080", }, diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index 109506799..9135e3362 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -414,7 +414,7 @@ ] }, { - "id": "HTTPJsonMapExporter", + "id": "NatsJsonMapExporter", "type": "*natsJSONMap", "export_path": "nats://localhost:4222", "attempts": 1, diff --git a/dispatchers/attributes_it_test.go b/dispatchers/attributes_it_test.go index 05b15317e..3f2b26261 100644 --- a/dispatchers/attributes_it_test.go +++ b/dispatchers/attributes_it_test.go @@ -244,7 +244,7 @@ func testDspAttrGetAttrFailover(t *testing.T) { } eRply := &engine.AttrSProcessEventReply{ - MatchedProfiles: []string{"ATTR_1002_SIMPLEAUTH"}, + MatchedProfiles: []string{"cgrates.org:ATTR_1002_SIMPLEAUTH"}, AlteredFields: []string{"*req.Password"}, CGREvent: &utils.CGREvent{ Tenant: "cgrates.org", @@ -447,7 +447,7 @@ func testDspAttrTestAuthKey2(t *testing.T) { } eRply := &engine.AttrSProcessEventReply{ - MatchedProfiles: []string{"ATTR_1001_SIMPLEAUTH"}, + MatchedProfiles: []string{"cgrates.org:ATTR_1001_SIMPLEAUTH"}, AlteredFields: []string{"*req.Password"}, CGREvent: &utils.CGREvent{ @@ -530,7 +530,7 @@ func testDspAttrGetAttrRoundRobin(t *testing.T) { } eRply := &engine.AttrSProcessEventReply{ - MatchedProfiles: []string{"ATTR_1002_SIMPLEAUTH"}, + MatchedProfiles: []string{"cgrates.org:ATTR_1002_SIMPLEAUTH"}, AlteredFields: []string{"*req.Password"}, CGREvent: &utils.CGREvent{ Tenant: "cgrates.org", @@ -602,7 +602,7 @@ func testDspAttrGetAttrInternal(t *testing.T) { } eRply := &engine.AttrSProcessEventReply{ - MatchedProfiles: []string{"ATTR_1003_SIMPLEAUTH"}, + MatchedProfiles: []string{"cgrates.org:ATTR_1003_SIMPLEAUTH"}, AlteredFields: []string{"*req.Password"}, CGREvent: &utils.CGREvent{ Tenant: "cgrates.org", diff --git a/dispatchers/sessions_it_test.go b/dispatchers/sessions_it_test.go index 94483d614..482805926 100644 --- a/dispatchers/sessions_it_test.go +++ b/dispatchers/sessions_it_test.go @@ -379,7 +379,7 @@ func testDspSessionUpdate(t *testing.T) { t.Error(err) } eAttrs := &engine.AttrSProcessEventReply{ - MatchedProfiles: []string{"ATTR_ACNT_1001"}, + MatchedProfiles: []string{"cgrates.org:ATTR_ACNT_1001"}, AlteredFields: []string{"*req.OfficeGroup"}, CGREvent: &utils.CGREvent{ Tenant: "cgrates.org", @@ -450,7 +450,7 @@ func testDspSessionUpdate2(t *testing.T) { t.Fatal(err) } eAttrs := &engine.AttrSProcessEventReply{ - MatchedProfiles: []string{"ATTR_1001_SESSIONAUTH"}, + MatchedProfiles: []string{"cgrates.org:ATTR_1001_SESSIONAUTH"}, AlteredFields: []string{"*req.LCRProfile", "*req.Password", "*req.RequestType", "*req.PaypalAccount"}, CGREvent: &utils.CGREvent{ @@ -605,7 +605,7 @@ func testDspSessionProcessEvent(t *testing.T) { t.Errorf("Unexpected ResourceAllocation: %s", *rply.ResourceAllocation) } eAttrs := &engine.AttrSProcessEventReply{ - MatchedProfiles: []string{"ATTR_ACNT_1001"}, + MatchedProfiles: []string{"cgrates.org:ATTR_ACNT_1001"}, AlteredFields: []string{"*req.OfficeGroup"}, CGREvent: &utils.CGREvent{ @@ -684,7 +684,7 @@ func testDspSessionProcessEvent2(t *testing.T) { t.Errorf("Unexpected ResourceAllocation: %s", *rply.ResourceAllocation) } eAttrs := &engine.AttrSProcessEventReply{ - MatchedProfiles: []string{"ATTR_1001_SIMPLEAUTH"}, + MatchedProfiles: []string{"cgrates.org:ATTR_1001_SIMPLEAUTH"}, AlteredFields: []string{"*req.EventName", "*req.Password"}, CGREvent: &utils.CGREvent{ diff --git a/ees/nats_it_test.go b/ees/nats_it_test.go index f58eb96a7..87f4eb49c 100644 --- a/ees/nats_it_test.go +++ b/ees/nats_it_test.go @@ -46,7 +46,13 @@ func TestNatsEE(t *testing.T) { if err != nil { t.Fatal(err) } - evExp, err := NewEventExporter(cfg, 5, new(engine.FilterS)) + var idx int + for idx = range cfg.EEsCfg().Exporters { + if cfg.EEsCfg().Exporters[idx].ID == "NatsJsonMapExporter" { + break + } + } + evExp, err := NewEventExporter(cfg, idx, new(engine.FilterS)) if err != nil { t.Fatal(err) } diff --git a/engine/libstats.go b/engine/libstats.go index 55ddf7d33..18c2e2339 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -346,25 +346,14 @@ func (sis StatQueues) Sort() { sort.Slice(sis, func(i, j int) bool { return sis[i].sqPrfl.Weight > sis[j].sqPrfl.Weight }) } -type encStatQueue struct { - Tenant string - ID string - SQItems []SQItem - SQMetrics map[string]StatMetric -} - func (sq *StatQueue) MarshalJSON() (rply []byte, err error) { if sq == nil { return []byte("null"), nil } + type tmp StatQueue guardian.Guardian.Guard(context.Background(), func(*context.Context) (_ interface{}, _ error) { sq.RLock() - rply, err = json.Marshal(&encStatQueue{ - Tenant: sq.Tenant, - ID: sq.ID, - SQItems: sq.SQItems, - SQMetrics: sq.SQMetrics, - }) + rply, err = json.Marshal(tmp(*sq)) sq.RUnlock() return }, config.CgrConfig().GeneralCfg().LockingTimeout, utils.StatQueuePrefix+sq.TenantID()) @@ -414,7 +403,6 @@ func (sq *StatQueue) UnmarshalJSON(data []byte) (err error) { return fmt.Errorf("unsupported metric type <%s>", metricSplit[0]) } if err = json.Unmarshal([]byte(val), metric); err != nil { - fmt.Println(1) return } sq.SQMetrics[metricID] = metric @@ -424,14 +412,10 @@ func (sq *StatQueue) UnmarshalJSON(data []byte) (err error) { func (sq *StatQueue) GobEncode() (rply []byte, err error) { buf := bytes.NewBuffer(rply) + type tmp StatQueue guardian.Guardian.Guard(context.Background(), func(*context.Context) (_ interface{}, _ error) { sq.RLock() - err = gob.NewEncoder(buf).Encode(&encStatQueue{ - Tenant: sq.Tenant, - ID: sq.ID, - SQItems: sq.SQItems, - SQMetrics: sq.SQMetrics, - }) + err = gob.NewEncoder(buf).Encode(tmp(*sq)) sq.RUnlock() return }, config.CgrConfig().GeneralCfg().LockingTimeout, utils.StatQueuePrefix+sq.TenantID()) @@ -458,3 +442,58 @@ func (sq *StatQueue) Clone() (cln *StatQueue) { } return } + +func (ssq *StatQueueWithAPIOpts) MarshalJSON() (rply []byte, err error) { + if ssq == nil { + return []byte("null"), nil + } + type tmp struct { + StatQueue + APIOpts map[string]interface{} + } + guardian.Guardian.Guard(context.Background(), func(*context.Context) (_ interface{}, _ error) { + ssq.RLock() + rply, err = json.Marshal(tmp{ + StatQueue: *ssq.StatQueue, + APIOpts: ssq.APIOpts, + }) + ssq.RUnlock() + return + }, config.CgrConfig().GeneralCfg().LockingTimeout, utils.StatQueuePrefix+ssq.TenantID()) + return +} + +func (ssq *StatQueueWithAPIOpts) GobEncode() (rply []byte, err error) { + buf := bytes.NewBuffer(rply) + type tmp struct { + StatQueue + APIOpts map[string]interface{} + } + guardian.Guardian.Guard(context.Background(), func(*context.Context) (_ interface{}, _ error) { + ssq.RLock() + err = gob.NewEncoder(buf).Encode(tmp{ + StatQueue: *ssq.StatQueue, + APIOpts: ssq.APIOpts, + }) + ssq.RUnlock() + return + }, config.CgrConfig().GeneralCfg().LockingTimeout, utils.StatQueuePrefix+ssq.TenantID()) + return buf.Bytes(), nil +} + +// UnmarshalJSON here only to fully support json for StatQueue +func (ssq *StatQueueWithAPIOpts) UnmarshalJSON(data []byte) (err error) { + sq := new(StatQueue) + if err = json.Unmarshal(data, &sq); err != nil { + return + } + i := struct { + APIOpts map[string]interface{} + }{} + if err = json.Unmarshal(data, &i); err != nil { + return + } + ssq.StatQueue = sq + ssq.APIOpts = i.APIOpts + return +} diff --git a/engine/libstats_test.go b/engine/libstats_test.go index 4d60fbc7e..7b7309d29 100644 --- a/engine/libstats_test.go +++ b/engine/libstats_test.go @@ -697,7 +697,7 @@ func TestStatRemoveExpiredQueue(t *testing.T) { } func TestStatQueueJSONMarshall(t *testing.T) { - rply := new(StatQueue) + var rply *StatQueue exp, err := NewStatQueue("cgrates.org", "STS", []*MetricWithFilters{ {MetricID: utils.MetaASR}, {MetricID: utils.MetaTCD}, @@ -705,10 +705,31 @@ func TestStatQueueJSONMarshall(t *testing.T) { if err != nil { t.Fatal(err) } - if err = json.Unmarshal([]byte(utils.ToJSON(exp)), rply); err != nil { + if err = json.Unmarshal([]byte(utils.ToJSON(exp)), &rply); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(rply, exp) { t.Errorf("Expected: %s , received: %s", utils.ToJSON(exp), utils.ToJSON(rply)) } } + +func TestStatQueueWithAPIOptsJSONMarshall(t *testing.T) { + rply := &StatQueueWithAPIOpts{} + exp, err := NewStatQueue("cgrates.org", "STS", []*MetricWithFilters{ + {MetricID: utils.MetaASR}, + {MetricID: utils.MetaTCD}, + }, 1) + exp2 := &StatQueueWithAPIOpts{ + StatQueue: exp, + APIOpts: map[string]interface{}{"a": "a"}, + } + if err != nil { + t.Fatal(err) + } + if err = json.Unmarshal([]byte(utils.ToJSON(exp2)), rply); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(rply, exp2) { + t.Errorf("Expected: %s , received: %s", utils.ToJSON(exp2), utils.ToJSON(rply)) + } + +} diff --git a/general_tests/filters_it_test.go b/general_tests/filters_it_test.go index 708832540..752f86a75 100644 --- a/general_tests/filters_it_test.go +++ b/general_tests/filters_it_test.go @@ -1018,7 +1018,7 @@ func testV1FltrAttributesPrefix(t *testing.T) { processedEv := &engine.AttrSProcessEventReply{ AlteredFields: []string{"*req.CustomField"}, - MatchedProfiles: []string{"cgrates.org:ATTR_1001"}, + MatchedProfiles: []string{"cgrates.new:ATTR_1001"}, CGREvent: &utils.CGREvent{ Tenant: "cgrates.new",