Updated caps birpc server

This commit is contained in:
Trial97
2021-07-14 12:49:57 +03:00
committed by Dan Christian Bogos
parent 7a2d234108
commit 0bdbe2df4d
9 changed files with 103 additions and 34 deletions

View File

@@ -144,6 +144,9 @@ func newCapsBiRPCJSONCodec(conn conn, caps *engine.Caps, anz *analyzers.Analyzer
}
func newCapsBiRPCCodec(sc birpc.BirpcCodec, caps *engine.Caps) birpc.BirpcCodec {
if !caps.IsLimited() {
return sc
}
return &capsBiRPCCodec{
sc: sc,
caps: caps,

View File

@@ -10,7 +10,7 @@
"listen": {
"rpc_json": ":6012",
"rpc_json": ":6012",
"rpc_gob": ":6013",
"http": ":6080",
},

View File

@@ -414,7 +414,7 @@
]
},
{
"id": "HTTPJsonMapExporter",
"id": "NatsJsonMapExporter",
"type": "*natsJSONMap",
"export_path": "nats://localhost:4222",
"attempts": 1,

View File

@@ -244,7 +244,7 @@ func testDspAttrGetAttrFailover(t *testing.T) {
}
eRply := &engine.AttrSProcessEventReply{
MatchedProfiles: []string{"ATTR_1002_SIMPLEAUTH"},
MatchedProfiles: []string{"cgrates.org:ATTR_1002_SIMPLEAUTH"},
AlteredFields: []string{"*req.Password"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
@@ -447,7 +447,7 @@ func testDspAttrTestAuthKey2(t *testing.T) {
}
eRply := &engine.AttrSProcessEventReply{
MatchedProfiles: []string{"ATTR_1001_SIMPLEAUTH"},
MatchedProfiles: []string{"cgrates.org:ATTR_1001_SIMPLEAUTH"},
AlteredFields: []string{"*req.Password"},
CGREvent: &utils.CGREvent{
@@ -530,7 +530,7 @@ func testDspAttrGetAttrRoundRobin(t *testing.T) {
}
eRply := &engine.AttrSProcessEventReply{
MatchedProfiles: []string{"ATTR_1002_SIMPLEAUTH"},
MatchedProfiles: []string{"cgrates.org:ATTR_1002_SIMPLEAUTH"},
AlteredFields: []string{"*req.Password"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
@@ -602,7 +602,7 @@ func testDspAttrGetAttrInternal(t *testing.T) {
}
eRply := &engine.AttrSProcessEventReply{
MatchedProfiles: []string{"ATTR_1003_SIMPLEAUTH"},
MatchedProfiles: []string{"cgrates.org:ATTR_1003_SIMPLEAUTH"},
AlteredFields: []string{"*req.Password"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",

View File

@@ -379,7 +379,7 @@ func testDspSessionUpdate(t *testing.T) {
t.Error(err)
}
eAttrs := &engine.AttrSProcessEventReply{
MatchedProfiles: []string{"ATTR_ACNT_1001"},
MatchedProfiles: []string{"cgrates.org:ATTR_ACNT_1001"},
AlteredFields: []string{"*req.OfficeGroup"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
@@ -450,7 +450,7 @@ func testDspSessionUpdate2(t *testing.T) {
t.Fatal(err)
}
eAttrs := &engine.AttrSProcessEventReply{
MatchedProfiles: []string{"ATTR_1001_SESSIONAUTH"},
MatchedProfiles: []string{"cgrates.org:ATTR_1001_SESSIONAUTH"},
AlteredFields: []string{"*req.LCRProfile", "*req.Password", "*req.RequestType", "*req.PaypalAccount"},
CGREvent: &utils.CGREvent{
@@ -605,7 +605,7 @@ func testDspSessionProcessEvent(t *testing.T) {
t.Errorf("Unexpected ResourceAllocation: %s", *rply.ResourceAllocation)
}
eAttrs := &engine.AttrSProcessEventReply{
MatchedProfiles: []string{"ATTR_ACNT_1001"},
MatchedProfiles: []string{"cgrates.org:ATTR_ACNT_1001"},
AlteredFields: []string{"*req.OfficeGroup"},
CGREvent: &utils.CGREvent{
@@ -684,7 +684,7 @@ func testDspSessionProcessEvent2(t *testing.T) {
t.Errorf("Unexpected ResourceAllocation: %s", *rply.ResourceAllocation)
}
eAttrs := &engine.AttrSProcessEventReply{
MatchedProfiles: []string{"ATTR_1001_SIMPLEAUTH"},
MatchedProfiles: []string{"cgrates.org:ATTR_1001_SIMPLEAUTH"},
AlteredFields: []string{"*req.EventName", "*req.Password"},
CGREvent: &utils.CGREvent{

View File

@@ -46,7 +46,13 @@ func TestNatsEE(t *testing.T) {
if err != nil {
t.Fatal(err)
}
evExp, err := NewEventExporter(cfg, 5, new(engine.FilterS))
var idx int
for idx = range cfg.EEsCfg().Exporters {
if cfg.EEsCfg().Exporters[idx].ID == "NatsJsonMapExporter" {
break
}
}
evExp, err := NewEventExporter(cfg, idx, new(engine.FilterS))
if err != nil {
t.Fatal(err)
}

View File

@@ -346,25 +346,14 @@ func (sis StatQueues) Sort() {
sort.Slice(sis, func(i, j int) bool { return sis[i].sqPrfl.Weight > sis[j].sqPrfl.Weight })
}
type encStatQueue struct {
Tenant string
ID string
SQItems []SQItem
SQMetrics map[string]StatMetric
}
func (sq *StatQueue) MarshalJSON() (rply []byte, err error) {
if sq == nil {
return []byte("null"), nil
}
type tmp StatQueue
guardian.Guardian.Guard(context.Background(), func(*context.Context) (_ interface{}, _ error) {
sq.RLock()
rply, err = json.Marshal(&encStatQueue{
Tenant: sq.Tenant,
ID: sq.ID,
SQItems: sq.SQItems,
SQMetrics: sq.SQMetrics,
})
rply, err = json.Marshal(tmp(*sq))
sq.RUnlock()
return
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.StatQueuePrefix+sq.TenantID())
@@ -414,7 +403,6 @@ func (sq *StatQueue) UnmarshalJSON(data []byte) (err error) {
return fmt.Errorf("unsupported metric type <%s>", metricSplit[0])
}
if err = json.Unmarshal([]byte(val), metric); err != nil {
fmt.Println(1)
return
}
sq.SQMetrics[metricID] = metric
@@ -424,14 +412,10 @@ func (sq *StatQueue) UnmarshalJSON(data []byte) (err error) {
func (sq *StatQueue) GobEncode() (rply []byte, err error) {
buf := bytes.NewBuffer(rply)
type tmp StatQueue
guardian.Guardian.Guard(context.Background(), func(*context.Context) (_ interface{}, _ error) {
sq.RLock()
err = gob.NewEncoder(buf).Encode(&encStatQueue{
Tenant: sq.Tenant,
ID: sq.ID,
SQItems: sq.SQItems,
SQMetrics: sq.SQMetrics,
})
err = gob.NewEncoder(buf).Encode(tmp(*sq))
sq.RUnlock()
return
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.StatQueuePrefix+sq.TenantID())
@@ -458,3 +442,58 @@ func (sq *StatQueue) Clone() (cln *StatQueue) {
}
return
}
func (ssq *StatQueueWithAPIOpts) MarshalJSON() (rply []byte, err error) {
if ssq == nil {
return []byte("null"), nil
}
type tmp struct {
StatQueue
APIOpts map[string]interface{}
}
guardian.Guardian.Guard(context.Background(), func(*context.Context) (_ interface{}, _ error) {
ssq.RLock()
rply, err = json.Marshal(tmp{
StatQueue: *ssq.StatQueue,
APIOpts: ssq.APIOpts,
})
ssq.RUnlock()
return
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.StatQueuePrefix+ssq.TenantID())
return
}
func (ssq *StatQueueWithAPIOpts) GobEncode() (rply []byte, err error) {
buf := bytes.NewBuffer(rply)
type tmp struct {
StatQueue
APIOpts map[string]interface{}
}
guardian.Guardian.Guard(context.Background(), func(*context.Context) (_ interface{}, _ error) {
ssq.RLock()
err = gob.NewEncoder(buf).Encode(tmp{
StatQueue: *ssq.StatQueue,
APIOpts: ssq.APIOpts,
})
ssq.RUnlock()
return
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.StatQueuePrefix+ssq.TenantID())
return buf.Bytes(), nil
}
// UnmarshalJSON here only to fully support json for StatQueue
func (ssq *StatQueueWithAPIOpts) UnmarshalJSON(data []byte) (err error) {
sq := new(StatQueue)
if err = json.Unmarshal(data, &sq); err != nil {
return
}
i := struct {
APIOpts map[string]interface{}
}{}
if err = json.Unmarshal(data, &i); err != nil {
return
}
ssq.StatQueue = sq
ssq.APIOpts = i.APIOpts
return
}

View File

@@ -697,7 +697,7 @@ func TestStatRemoveExpiredQueue(t *testing.T) {
}
func TestStatQueueJSONMarshall(t *testing.T) {
rply := new(StatQueue)
var rply *StatQueue
exp, err := NewStatQueue("cgrates.org", "STS", []*MetricWithFilters{
{MetricID: utils.MetaASR},
{MetricID: utils.MetaTCD},
@@ -705,10 +705,31 @@ func TestStatQueueJSONMarshall(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err = json.Unmarshal([]byte(utils.ToJSON(exp)), rply); err != nil {
if err = json.Unmarshal([]byte(utils.ToJSON(exp)), &rply); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(rply, exp) {
t.Errorf("Expected: %s , received: %s", utils.ToJSON(exp), utils.ToJSON(rply))
}
}
func TestStatQueueWithAPIOptsJSONMarshall(t *testing.T) {
rply := &StatQueueWithAPIOpts{}
exp, err := NewStatQueue("cgrates.org", "STS", []*MetricWithFilters{
{MetricID: utils.MetaASR},
{MetricID: utils.MetaTCD},
}, 1)
exp2 := &StatQueueWithAPIOpts{
StatQueue: exp,
APIOpts: map[string]interface{}{"a": "a"},
}
if err != nil {
t.Fatal(err)
}
if err = json.Unmarshal([]byte(utils.ToJSON(exp2)), rply); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(rply, exp2) {
t.Errorf("Expected: %s , received: %s", utils.ToJSON(exp2), utils.ToJSON(rply))
}
}

View File

@@ -1018,7 +1018,7 @@ func testV1FltrAttributesPrefix(t *testing.T) {
processedEv := &engine.AttrSProcessEventReply{
AlteredFields: []string{"*req.CustomField"},
MatchedProfiles: []string{"cgrates.org:ATTR_1001"},
MatchedProfiles: []string{"cgrates.new:ATTR_1001"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.new",