Update/remove dependencies

Remove cenkalti/rpc2 dependency and update client constructor.
Commented tests that only start and stop engine.

Fixed compilation errors caused by Azure/go-amqp update.

Remove cgrates/ugocodec dependency since we have everything
we need in ugoriji/go/codec and add tests for it.
This commit is contained in:
ionutboangiu
2023-11-15 05:19:11 -05:00
committed by Dan Christian Bogos
parent 26cdb571b8
commit 1ef2afaa97
14 changed files with 1343 additions and 1861 deletions

View File

@@ -24,9 +24,6 @@ package analyzers
import (
"errors"
"flag"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"os"
"path"
"reflect"
@@ -34,9 +31,9 @@ import (
"testing"
"time"
"github.com/cenkalti/rpc2"
jsonrpc2 "github.com/cenkalti/rpc2/jsonrpc"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/birpc/jsonrpc"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/loaders"
@@ -47,8 +44,8 @@ import (
var (
anzCfgPath string
anzCfg *config.CGRConfig
anzRPC *rpc.Client
anzBiRPC *rpc2.Client
anzRPC *birpc.Client
anzBiRPC *birpc.BirpcClient
sTestsAlsPrf = []func(t *testing.T){
testAnalyzerSInitCfg,
@@ -73,17 +70,24 @@ var (
encoding = flag.String("rpc", utils.MetaJSON, "what encoding whould be used for rpc comunication")
)
func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) {
func newRPCClient(cfg *config.ListenCfg) (c *birpc.Client, err error) {
switch *encoding {
case utils.MetaJSON:
return jsonrpc.Dial(utils.TCP, cfg.RPCJSONListen)
case utils.MetaGOB:
return rpc.Dial(utils.TCP, cfg.RPCGOBListen)
return birpc.Dial(utils.TCP, cfg.RPCGOBListen)
default:
return nil, errors.New("UNSUPPORTED_RPC")
}
}
type smock struct{}
func (*smock) DisconnectPeer(ctx *context.Context,
args *utils.AttrDisconnectSession, reply *string) error {
return utils.ErrNotFound
}
// Test start here
func TestAnalyzerSIT(t *testing.T) {
for _, stest := range sTestsAlsPrf {
@@ -126,24 +130,24 @@ func testAnalyzerSStartEngine(t *testing.T) {
// Connect rpc client to rater
func testAnalyzerSRPCConn(t *testing.T) {
var err error
srv, err := birpc.NewService(new(smock), utils.SessionSv1, true)
if err != nil {
t.Fatal(err)
}
anzRPC, err = newRPCClient(anzCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}
conn, err := net.Dial(utils.TCP, anzCfg.SessionSCfg().ListenBijson)
anzBiRPC, err = utils.NewBiJSONrpcClient(anzCfg.SessionSCfg().ListenBijson, srv)
if err != nil {
t.Fatal(err)
}
anzBiRPC = rpc2.NewClientWithCodec(jsonrpc2.NewJSONCodec(conn))
anzBiRPC.Handle(utils.SessionSv1DisconnectPeer, func(clnt *rpc2.Client, args any, rply *string) (err error) { return utils.ErrNotFound })
go anzBiRPC.Run()
}
func testAnalyzerSLoadTarrifPlans(t *testing.T) {
var reply string
time.Sleep(100 * time.Millisecond)
if err := anzRPC.Call(utils.LoaderSv1Run, &loaders.ArgsProcessFolder{
if err := anzRPC.Call(context.Background(), utils.LoaderSv1Run, &loaders.ArgsProcessFolder{
APIOpts: map[string]any{utils.MetaCache: utils.MetaReload},
}, &reply); err != nil {
t.Error(err)
@@ -221,7 +225,7 @@ func testAnalyzerSChargerSv1ProcessEvent(t *testing.T) {
},
}
if err := anzRPC.Call(utils.ChargerSv1ProcessEvent, cgrEv, &result2); err != nil {
if err := anzRPC.Call(context.Background(), utils.ChargerSv1ProcessEvent, cgrEv, &result2); err != nil {
t.Fatal(err)
}
sort.Slice(result2, func(i, j int) bool {
@@ -237,7 +241,7 @@ func testAnalyzerSV1Search(t *testing.T) {
// need to wait in order for the log gorutine to execute
time.Sleep(50 * time.Millisecond)
var result []map[string]any
if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, &QueryArgs{HeaderFilters: `+RequestEncoding:\*internal +RequestMethod:AttributeSv1\.ProcessEvent`}, &result); err != nil {
if err := anzRPC.Call(context.Background(), utils.AnalyzerSv1StringQuery, &QueryArgs{HeaderFilters: `+RequestEncoding:\*internal +RequestMethod:AttributeSv1\.ProcessEvent`}, &result); err != nil {
t.Error(err)
} else if len(result) != 1 {
t.Errorf("Unexpected result: %s", utils.ToJSON(result))
@@ -246,7 +250,7 @@ func testAnalyzerSV1Search(t *testing.T) {
func testAnalyzerSV1Search2(t *testing.T) {
var result []map[string]any
if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, &QueryArgs{HeaderFilters: `+RequestEncoding:\*json +RequestMethod:ChargerSv1\.ProcessEvent`}, &result); err != nil {
if err := anzRPC.Call(context.Background(), utils.AnalyzerSv1StringQuery, &QueryArgs{HeaderFilters: `+RequestEncoding:\*json +RequestMethod:ChargerSv1\.ProcessEvent`}, &result); err != nil {
t.Error(err)
} else if len(result) != 1 {
t.Errorf("Unexpected result: %s", utils.ToJSON(result))
@@ -255,7 +259,7 @@ func testAnalyzerSV1Search2(t *testing.T) {
func testAnalyzerSV1SearchWithContentFilters(t *testing.T) {
var result []map[string]any
if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, &QueryArgs{
if err := anzRPC.Call(context.Background(), utils.AnalyzerSv1StringQuery, &QueryArgs{
HeaderFilters: `+RequestEncoding:\*json`,
ContentFilters: []string{"*string:~*req.Event.Account:1010"},
}, &result); err != nil {
@@ -267,15 +271,15 @@ func testAnalyzerSV1SearchWithContentFilters(t *testing.T) {
func testAnalyzerSV1BirPCSession(t *testing.T) {
var rply string
anzBiRPC.Call(utils.SessionSv1STIRIdentity,
anzBiRPC.Call(context.Background(), utils.SessionSv1STIRIdentity,
&sessions.V1STIRIdentityArgs{}, &rply) // only call to register the birpc
if err := anzRPC.Call(utils.SessionSv1DisconnectPeer, &utils.DPRArgs{}, &rply); err == nil ||
if err := anzRPC.Call(context.Background(), utils.SessionSv1DisconnectPeer, &utils.DPRArgs{}, &rply); err == nil ||
err.Error() != utils.ErrPartiallyExecuted.Error() {
t.Fatal(err)
}
time.Sleep(50 * time.Millisecond)
var result []map[string]any
if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, &QueryArgs{HeaderFilters: `+RequestEncoding:\*birpc_json +RequestMethod:"SessionSv1.DisconnectPeer"`}, &result); err != nil {
if err := anzRPC.Call(context.Background(), utils.AnalyzerSv1StringQuery, &QueryArgs{HeaderFilters: `+RequestEncoding:\*birpc_json +RequestMethod:"SessionSv1.DisconnectPeer"`}, &result); err != nil {
t.Error(err)
} else if len(result) != 1 {
t.Errorf("Unexpected result: %s", utils.ToJSON(result))
@@ -378,7 +382,7 @@ func testAnalyzerSv1MultipleQuery(t *testing.T) {
var reply string
for _, filterProfile := range filterProfiles {
if err := anzRPC.Call(utils.AdminSv1SetFilter,
if err := anzRPC.Call(context.Background(), utils.AdminSv1SetFilter,
filterProfile, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
@@ -387,7 +391,7 @@ func testAnalyzerSv1MultipleQuery(t *testing.T) {
}
time.Sleep(50 * time.Millisecond)
var result []map[string]any
if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, &QueryArgs{
if err := anzRPC.Call(context.Background(), utils.AnalyzerSv1StringQuery, &QueryArgs{
HeaderFilters: `+RequestMethod:"AdminSv1.SetFilter"`,
ContentFilters: []string{"*prefix:~*req.ID:TestA"},
}, &result); err != nil {

View File

@@ -66,7 +66,7 @@ func (pstr *AMQPv1EE) Connect() (err error) {
pstr.Lock()
defer pstr.Unlock()
if pstr.conn == nil {
if pstr.conn, err = amqpv1.Dial(pstr.Cfg().ExportPath, pstr.connOpts); err != nil {
if pstr.conn, err = amqpv1.Dial(context.TODO(), pstr.Cfg().ExportPath, pstr.connOpts); err != nil {
return
}
}
@@ -99,7 +99,7 @@ func (pstr *AMQPv1EE) ExportEvent(ctx *context.Context, content, _ any) (err err
return
}
// Send message
err = sender.Send(ctx, amqpv1.NewMessage(content.([]byte)))
err = sender.Send(ctx, amqpv1.NewMessage(content.([]byte)), nil)
sender.Close(ctx)
return
}

View File

@@ -141,23 +141,22 @@ func testAMQPv1ExportEvent(t *testing.T) {
}
func testAMQPv1VerifyExport(t *testing.T) {
ctx := context.Background()
// Create client
client, err := amqpv1.Dial(amqpv1DialURL, amqpv1ConnOpts)
client, err := amqpv1.Dial(ctx, amqpv1DialURL, amqpv1ConnOpts)
if err != nil {
t.Fatal("Dialing AMQP server:", err)
}
defer client.Close()
// Open a session
session, err := client.NewSession(context.Background(), nil)
session, err := client.NewSession(ctx, nil)
if err != nil {
t.Fatal("Creating AMQP session:", err)
}
ctx := context.Background()
// Create a receiver
receiver, err := session.NewReceiver(context.Background(), "/cgrates_cdrs", nil)
receiver, err := session.NewReceiver(ctx, "/cgrates_cdrs", nil)
if err != nil {
t.Fatal("Creating receiver link:", err)
}
@@ -168,7 +167,7 @@ func testAMQPv1VerifyExport(t *testing.T) {
}()
// Receive message
msg, err := receiver.Receive(ctx)
msg, err := receiver.Receive(ctx, nil)
if err != nil {
t.Fatal("Reading message from AMQP:", err)
}

View File

@@ -298,19 +298,20 @@ func TestAMQPv1Poster(t *testing.T) {
t.Fatal(err)
}
// Create client
client, err := amqpv1.Dial(endpoint, nil)
ctx := context.Background()
client, err := amqpv1.Dial(ctx, endpoint, nil)
if err != nil {
t.Fatal("Dialing AMQP server:", err)
}
defer client.Close()
// Open a session
session, err := client.NewSession(context.Background(), nil)
session, err := client.NewSession(ctx, nil)
if err != nil {
t.Fatal("Creating AMQP session:", err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(ctx, time.Second)
// Create a receiver
receiver, err := session.NewReceiver(ctx, "/"+qname, nil)
@@ -324,7 +325,7 @@ func TestAMQPv1Poster(t *testing.T) {
}()
// Receive next message
msg, err := receiver.Receive(ctx)
msg, err := receiver.Receive(ctx, nil)
cancel()
if err != nil {
t.Fatal("Reading message from AMQP:", err)

View File

@@ -94,7 +94,7 @@ func (rdr *AMQPv1ER) Config() *config.EventReaderCfg {
// Serve will start the gorutines needed to watch the amqpv1 topic
func (rdr *AMQPv1ER) Serve() (err error) {
if rdr.conn, err = amqpv1.Dial(rdr.Config().SourcePath, rdr.connOpts); err != nil {
if rdr.conn, err = amqpv1.Dial(context.TODO(), rdr.Config().SourcePath, rdr.connOpts); err != nil {
return
}
if rdr.ses, err = rdr.conn.NewSession(context.TODO(), nil); err != nil {
@@ -129,7 +129,7 @@ func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) {
}
ctx := context.Background()
var msg *amqpv1.Message
if msg, err = recv.Receive(ctx); err != nil {
if msg, err = recv.Receive(ctx, nil); err != nil {
if err.Error() == "amqp: link closed" {
err = nil
return

View File

@@ -84,25 +84,26 @@ func TestAMQPERv1(t *testing.T) {
t.Fatal(err)
}
amqpv1Rdr := rdr.(*AMQPv1ER)
connection, err := amqpv1.Dial("amqps://RootManageSharedAccessKey:Je8l%2Bt9tyOgZbdA%2B5SmGIJEsEzhZ9VdIO7yRke5EYtM%3D@test0123456y.servicebus.windows.net", nil)
ctx := context.Background()
connection, err := amqpv1.Dial(ctx, "amqps://RootManageSharedAccessKey:Je8l%2Bt9tyOgZbdA%2B5SmGIJEsEzhZ9VdIO7yRke5EYtM%3D@test0123456y.servicebus.windows.net", nil)
if err != nil {
t.Fatal(err)
}
defer connection.Close()
channel, err := connection.NewSession(context.Background(), nil)
channel, err := connection.NewSession(ctx, nil)
if err != nil {
t.Fatal(err)
}
defer channel.Close(context.Background())
defer channel.Close(ctx)
randomOriginID := utils.UUIDSha1Prefix()
sndr, err := channel.NewSender(context.Background(), amqpv1Rdr.queueID, nil)
sndr, err := channel.NewSender(ctx, amqpv1Rdr.queueID, nil)
if err != nil {
t.Fatal(err)
}
if err = sndr.Send(context.Background(),
amqpv1.NewMessage([]byte(fmt.Sprintf(`{"OriginID": "%s"}`, randomOriginID)))); err != nil {
if err = sndr.Send(ctx,
amqpv1.NewMessage([]byte(fmt.Sprintf(`{"OriginID": "%s"}`, randomOriginID))), nil); err != nil {
t.Fatal(err)
}
if err = rdr.Serve(); err != nil {

156
go.mod
View File

@@ -7,11 +7,10 @@ go 1.21
// replace github.com/cgrates/rpcclient => ../rpcclient
require (
github.com/Azure/go-amqp v0.18.1
github.com/Azure/go-amqp v1.0.2
github.com/antchfx/xmlquery v1.3.11
github.com/aws/aws-sdk-go v1.44.43
github.com/blevesearch/bleve v1.0.14
github.com/cenkalti/rpc2 v0.0.0-20210604223624-c1acbc6ec984
github.com/cgrates/aringo v0.0.0-20220525160735-b5990313d99e
github.com/cgrates/baningo v0.0.0-20210413080722-004ffd5e429f
github.com/cgrates/birpc v1.3.1-0.20211117095917-5b0ff29f3084
@@ -22,38 +21,50 @@ require (
github.com/cgrates/radigo v0.0.0-20210902121842-ea2f9a730627
github.com/cgrates/rpcclient v0.0.0-20220922181803-b3ddc74ad65a
github.com/cgrates/sipingo v1.0.1-0.20200514112313-699ebc1cdb8e
github.com/cgrates/ugocodec v0.0.0-20201023092048-df93d0123f60
github.com/creack/pty v1.1.18
github.com/creack/pty v1.1.20
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6
github.com/elastic/go-elasticsearch/v8 v8.8.0
github.com/ericlagergren/decimal v0.0.0-20211103172832-aca2edc11f73
github.com/elastic/elastic-transport-go/v8 v8.3.0
github.com/elastic/go-elasticsearch/v8 v8.11.0
github.com/ericlagergren/decimal v0.0.0-20221120152707-495c53812d05
github.com/fiorix/go-diameter/v4 v4.0.4
github.com/fsnotify/fsnotify v1.5.4
github.com/go-sql-driver/mysql v1.6.0
github.com/mediocregopher/radix/v3 v3.8.0
github.com/miekg/dns v1.1.50
github.com/nats-io/nats.go v1.30.2
github.com/nyaruka/phonenumbers v1.1.0
github.com/fsnotify/fsnotify v1.7.0
github.com/go-sql-driver/mysql v1.7.1
github.com/mediocregopher/radix/v3 v3.8.1
github.com/miekg/dns v1.1.57
github.com/nats-io/nats-server/v2 v2.10.5
github.com/nats-io/nats.go v1.31.0
github.com/nyaruka/phonenumbers v1.1.9
github.com/peterh/liner v1.2.2
github.com/rabbitmq/amqp091-go v1.5.0
github.com/segmentio/kafka-go v0.4.32
github.com/prometheus/client_golang v1.17.0
github.com/rabbitmq/amqp091-go v1.9.0
github.com/segmentio/kafka-go v0.4.44
go.mongodb.org/mongo-driver v1.13.0
golang.org/x/crypto v0.13.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/net v0.15.0
golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2
google.golang.org/api v0.85.0
gorm.io/driver/mysql v1.3.4
gorm.io/driver/postgres v1.3.7
gorm.io/gorm v1.23.6
golang.org/x/crypto v0.15.0
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa
golang.org/x/net v0.18.0
golang.org/x/oauth2 v0.14.0
google.golang.org/api v0.150.0
gorm.io/driver/mysql v1.5.2
gorm.io/driver/postgres v1.5.4
gorm.io/gorm v1.25.5
)
require (
github.com/RoaringBitmap/roaring v1.2.1 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/jackc/pgx/v5 v5.5.0 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect
)
require (
cloud.google.com/go/compute v1.23.1 // indirect
github.com/RoaringBitmap/roaring v0.4.23 // indirect
github.com/antchfx/xpath v1.2.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blevesearch/go-porterstemmer v1.0.3 // indirect
github.com/blevesearch/mmap-go v1.0.4 // indirect
github.com/blevesearch/mmap-go v1.0.2 // indirect
github.com/blevesearch/segment v0.9.0 // indirect
github.com/blevesearch/snowballstem v0.9.0 // indirect
github.com/blevesearch/zap/v11 v11.0.14 // indirect
@@ -62,71 +73,56 @@ require (
github.com/blevesearch/zap/v14 v14.0.5 // indirect
github.com/blevesearch/zap/v15 v15.0.3 // indirect
github.com/cenkalti/hub v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/couchbase/ghistogram v0.1.0 // indirect
github.com/couchbase/moss v0.3.0 // indirect
github.com/couchbase/moss v0.1.0 // indirect
github.com/couchbase/vellum v1.0.2 // indirect
github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/googleapis/gax-go/v2 v2.4.0 // indirect
github.com/ishidawataru/sctp v0.0.0-20210707070123-9a39160e9062 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.12.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/ishidawataru/sctp v0.0.0-20190922091402-408ec287e38c // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.11.0 // indirect
github.com/jackc/pgx/v4 v4.16.1 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/nats-io/nats-server/v2 v2.10.1
github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/prometheus/client_golang v1.12.2
github.com/rivo/uniseg v0.2.0 // indirect
github.com/steveyen/gtreap v0.1.0 // indirect
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/tinylib/msgp v1.1.6 // indirect
github.com/willf/bitset v1.1.11 // indirect
github.com/xdg/scram v1.0.5 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.opencensus.io v0.23.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.13.0 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220627200112-0a929928cb33 // indirect
google.golang.org/grpc v1.47.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
)
require (
cloud.google.com/go/compute v1.7.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.2.2 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/nats-io/jwt/v2 v2.5.2 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.35.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
github.com/nats-io/nkeys v0.4.6 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/philhofer/fwd v1.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/steveyen/gtreap v0.1.0 // indirect
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/tinylib/msgp v1.1.2 // indirect
github.com/ugorji/go/codec v1.2.11
github.com/willf/bitset v1.1.10 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
golang.org/x/time v0.3.0 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.etcd.io/bbolt v1.3.5 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.4.0 // indirect
golang.org/x/tools v0.15.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)

1007
go.sum

File diff suppressed because it is too large Load Diff

View File

@@ -21,369 +21,369 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package sessions
import (
"path"
"testing"
// import (
// "path"
// "testing"
"github.com/cenkalti/rpc2"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
// "github.com/cenkalti/rpc2"
// "github.com/cgrates/birpc/context"
// "github.com/cgrates/cgrates/config"
// "github.com/cgrates/cgrates/engine"
// "github.com/cgrates/cgrates/utils"
// )
var (
sessionsBiRPCCfgPath string
sessionsBiRPCCfgDIR string
sessionsBiRPCCfg *config.CGRConfig
sessionsBiRPC *rpc2.Client
disconnectEvChan = make(chan *utils.AttrDisconnectSession, 1)
err error
sessionsTests = []func(t *testing.T){
testSessionsBiRPCInitCfg,
testSessionsBiRPCFlushDBs,
testSessionsBiRPCStartEngine,
testSessionsBiRPCApierRpcConn,
// testSessionsBiRPCTPFromFolder,
//testSessionsBiRPCSessionAutomaticDisconnects,
//testSessionsBiRPCSessionOriginatorTerminate,
testSessionsBiRPCStopCgrEngine,
}
)
// var (
// sessionsBiRPCCfgPath string
// sessionsBiRPCCfgDIR string
// sessionsBiRPCCfg *config.CGRConfig
// sessionsBiRPC *rpc2.Client
// disconnectEvChan = make(chan *utils.AttrDisconnectSession, 1)
// err error
// sessionsTests = []func(t *testing.T){
// testSessionsBiRPCInitCfg,
// testSessionsBiRPCFlushDBs,
// testSessionsBiRPCStartEngine,
// testSessionsBiRPCApierRpcConn,
// // testSessionsBiRPCTPFromFolder,
// //testSessionsBiRPCSessionAutomaticDisconnects,
// //testSessionsBiRPCSessionOriginatorTerminate,
// testSessionsBiRPCStopCgrEngine,
// }
// )
// Tests starts here
func TestSessionsBiRPC(t *testing.T) {
switch *dbType {
case utils.MetaInternal:
sessionsBiRPCCfgDIR = "smg_automatic_debits_internal"
case utils.MetaMySQL:
sessionsBiRPCCfgDIR = "smg_automatic_debits_mysql"
case utils.MetaMongo:
sessionsBiRPCCfgDIR = "smg_automatic_debits_mongo"
case utils.MetaPostgres:
t.SkipNow()
default:
t.Fatal("Unknown Database type")
}
for _, stest := range sessionsTests {
t.Run(sessionsBiRPCCfgDIR, stest)
}
}
// // Tests starts here
// func TestSessionsBiRPC(t *testing.T) {
// switch *dbType {
// case utils.MetaInternal:
// sessionsBiRPCCfgDIR = "smg_automatic_debits_internal"
// case utils.MetaMySQL:
// sessionsBiRPCCfgDIR = "smg_automatic_debits_mysql"
// case utils.MetaMongo:
// sessionsBiRPCCfgDIR = "smg_automatic_debits_mongo"
// case utils.MetaPostgres:
// t.SkipNow()
// default:
// t.Fatal("Unknown Database type")
// }
// for _, stest := range sessionsTests {
// t.Run(sessionsBiRPCCfgDIR, stest)
// }
// }
func handleDisconnectSession(clnt *rpc2.Client,
args *utils.AttrDisconnectSession, reply *string) error {
disconnectEvChan <- args
*reply = utils.OK
return nil
}
// func handleDisconnectSession(clnt *rpc2.Client,
// args *utils.AttrDisconnectSession, reply *string) error {
// disconnectEvChan <- args
// *reply = utils.OK
// return nil
// }
func testSessionsBiRPCInitCfg(t *testing.T) {
sessionsBiRPCCfgPath = path.Join(*dataDir, "conf", "samples", sessionsBiRPCCfgDIR)
// Init config first
sessionsBiRPCCfg, err = config.NewCGRConfigFromPath(context.Background(), sessionsBiRPCCfgPath)
if err != nil {
t.Error(err)
}
}
// func testSessionsBiRPCInitCfg(t *testing.T) {
// sessionsBiRPCCfgPath = path.Join(*dataDir, "conf", "samples", sessionsBiRPCCfgDIR)
// // Init config first
// sessionsBiRPCCfg, err = config.NewCGRConfigFromPath(context.Background(), sessionsBiRPCCfgPath)
// if err != nil {
// t.Error(err)
// }
// }
// Remove data in both rating and accounting db
func testSessionsBiRPCFlushDBs(t *testing.T) {
if err := engine.InitDataDB(sessionsBiRPCCfg); err != nil {
t.Fatal(err)
}
if err := engine.InitStorDB(sessionsBiRPCCfg); err != nil {
t.Fatal(err)
}
}
// // Remove data in both rating and accounting db
// func testSessionsBiRPCFlushDBs(t *testing.T) {
// if err := engine.InitDataDB(sessionsBiRPCCfg); err != nil {
// t.Fatal(err)
// }
// if err := engine.InitStorDB(sessionsBiRPCCfg); err != nil {
// t.Fatal(err)
// }
// }
// Start CGR Engine
func testSessionsBiRPCStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(sessionsBiRPCCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
// // Start CGR Engine
// func testSessionsBiRPCStartEngine(t *testing.T) {
// if _, err := engine.StopStartEngine(sessionsBiRPCCfgPath, *waitRater); err != nil {
// t.Fatal(err)
// }
// }
// Connect rpc client to rater
func testSessionsBiRPCApierRpcConn(t *testing.T) {
clntHandlers := map[string]any{utils.SessionSv1DisconnectSession: handleDisconnectSession}
dummyClnt, err := utils.NewBiJSONrpcClient(sessionsBiRPCCfg.SessionSCfg().ListenBijson,
clntHandlers)
if err != nil { // First attempt is to make sure multiple clients are supported
t.Fatal(err)
}
if sessionsBiRPC, err = utils.NewBiJSONrpcClient(sessionsBiRPCCfg.SessionSCfg().ListenBijson,
clntHandlers); err != nil {
t.Fatal(err)
}
if sessionsRPC, err = newRPCClient(sessionsBiRPCCfg.ListenCfg()); err != nil { // Connect also simple RPC so we can check accounts and such
t.Fatal(err)
}
dummyClnt.Close() // close so we don't get EOF error when disconnecting server
}
// // Connect rpc client to rater
// func testSessionsBiRPCApierRpcConn(t *testing.T) {
// clntHandlers := map[string]any{utils.SessionSv1DisconnectSession: handleDisconnectSession}
// dummyClnt, err := utils.NewBiJSONrpcClient(sessionsBiRPCCfg.SessionSCfg().ListenBijson,
// clntHandlers)
// if err != nil { // First attempt is to make sure multiple clients are supported
// t.Fatal(err)
// }
// if sessionsBiRPC, err = utils.NewBiJSONrpcClient(sessionsBiRPCCfg.SessionSCfg().ListenBijson,
// clntHandlers); err != nil {
// t.Fatal(err)
// }
// if sessionsRPC, err = newRPCClient(sessionsBiRPCCfg.ListenCfg()); err != nil { // Connect also simple RPC so we can check accounts and such
// t.Fatal(err)
// }
// dummyClnt.Close() // close so we don't get EOF error when disconnecting server
// }
/*
// /*
// Load the tariff plan, creating accounts and their balances
func testSessionsBiRPCTPFromFolder(t *testing.T) {
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "oldtutorial")}
var loadInst utils.LoadInstance
if err := sessionsRPC.Call(utils.APIerSv2LoadTariffPlanFromFolder, attrs, &loadInst); err != nil {
t.Error(err)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups
}
// // Load the tariff plan, creating accounts and their balances
// func testSessionsBiRPCTPFromFolder(t *testing.T) {
// attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "oldtutorial")}
// var loadInst utils.LoadInstance
// if err := sessionsRPC.Call(utils.APIerSv2LoadTariffPlanFromFolder, attrs, &loadInst); err != nil {
// t.Error(err)
// }
// time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups
// }
func testSessionsBiRPCSessionAutomaticDisconnects(t *testing.T) {
// Create a balance with 1 second inside and rating increments of 1ms (to be compatible with debit interval)
attrSetBalance := utils.AttrSetBalance{Tenant: "cgrates.org",
Account: "TestSessionsBiRPCSessionAutomaticDisconnects",
BalanceType: utils.MetaVoice,
Value: 0.01 * float64(time.Second),
Balance: map[string]any{
utils.ID: "TestSessionsBiRPCSessionAutomaticDisconnects",
utils.RatingSubject: "*zero1ms",
},
}
var reply string
if err := sessionsRPC.Call(utils.APIerSv2SetBalance, attrSetBalance, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Received: %s", reply)
}
var acnt *utils.Account
attrGetAcnt := &utils.AttrGetAccount{
Tenant: attrSetBalance.Tenant,
Account: attrSetBalance.Account,
}
eAcntVal := 0.01 * float64(time.Second)
if err := sessionsRPC.Call(utils.APIerSv2GetAccount, attrGetAcnt, &acnt); err != nil {
t.Error(err)
} else if acnt.BalanceMap[utils.MetaVoice].GetTotalValue() != eAcntVal {
t.Errorf("Expecting: %f, received: %f", eAcntVal,
acnt.BalanceMap[utils.MetaVoice].GetTotalValue())
}
// func testSessionsBiRPCSessionAutomaticDisconnects(t *testing.T) {
// // Create a balance with 1 second inside and rating increments of 1ms (to be compatible with debit interval)
// attrSetBalance := utils.AttrSetBalance{Tenant: "cgrates.org",
// Account: "TestSessionsBiRPCSessionAutomaticDisconnects",
// BalanceType: utils.MetaVoice,
// Value: 0.01 * float64(time.Second),
// Balance: map[string]any{
// utils.ID: "TestSessionsBiRPCSessionAutomaticDisconnects",
// utils.RatingSubject: "*zero1ms",
// },
// }
// var reply string
// if err := sessionsRPC.Call(utils.APIerSv2SetBalance, attrSetBalance, &reply); err != nil {
// t.Error(err)
// } else if reply != utils.OK {
// t.Errorf("Received: %s", reply)
// }
// var acnt *utils.Account
// attrGetAcnt := &utils.AttrGetAccount{
// Tenant: attrSetBalance.Tenant,
// Account: attrSetBalance.Account,
// }
// eAcntVal := 0.01 * float64(time.Second)
// if err := sessionsRPC.Call(utils.APIerSv2GetAccount, attrGetAcnt, &acnt); err != nil {
// t.Error(err)
// } else if acnt.BalanceMap[utils.MetaVoice].GetTotalValue() != eAcntVal {
// t.Errorf("Expecting: %f, received: %f", eAcntVal,
// acnt.BalanceMap[utils.MetaVoice].GetTotalValue())
// }
initArgs := &V1InitSessionArgs{
InitSession: true,
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "TestSessionsBiRPCSessionAutomaticDisconnects",
Event: map[string]any{
utils.EventName: "TEST_EVENT",
utils.ToR: utils.MetaVoice,
utils.OriginID: "123451",
utils.AccountField: attrSetBalance.Account,
utils.Subject: attrSetBalance.Account,
utils.Destination: "1004",
utils.Category: "call",
utils.Tenant: attrSetBalance.Tenant,
utils.RequestType: utils.MetaPrepaid,
utils.SetupTime: time.Date(2016, time.January, 5, 18, 30, 59, 0, time.UTC),
utils.AnswerTime: time.Date(2016, time.January, 5, 18, 31, 05, 0, time.UTC),
utils.Usage: 200 * time.Millisecond,
},
},
}
// initArgs := &V1InitSessionArgs{
// InitSession: true,
// CGREvent: &utils.CGREvent{
// Tenant: "cgrates.org",
// ID: "TestSessionsBiRPCSessionAutomaticDisconnects",
// Event: map[string]any{
// utils.EventName: "TEST_EVENT",
// utils.ToR: utils.MetaVoice,
// utils.OriginID: "123451",
// utils.AccountField: attrSetBalance.Account,
// utils.Subject: attrSetBalance.Account,
// utils.Destination: "1004",
// utils.Category: "call",
// utils.Tenant: attrSetBalance.Tenant,
// utils.RequestType: utils.MetaPrepaid,
// utils.SetupTime: time.Date(2016, time.January, 5, 18, 30, 59, 0, time.UTC),
// utils.AnswerTime: time.Date(2016, time.January, 5, 18, 31, 05, 0, time.UTC),
// utils.Usage: 200 * time.Millisecond,
// },
// },
// }
var initRpl *V1InitSessionReply
if err := sessionsBiRPC.Call(utils.SessionSv1InitiateSession,
initArgs, &initRpl); err != nil {
t.Fatal(err)
}
time.Sleep(10 * time.Millisecond) // give some time to allow the session to be created
expMaxUsage := 3 * time.Hour // MaxCallDuration from config
if initRpl.MaxUsage == nil || *initRpl.MaxUsage != expMaxUsage {
t.Errorf("Expecting : %+v, received: %+v", expMaxUsage, initRpl.MaxUsage)
}
// Make sure we are receiving a disconnect event
select {
case <-time.After(100 * time.Millisecond):
t.Error("Did not receive disconnect event")
case disconnectEv := <-disconnectEvChan:
if engine.NewMapEvent(disconnectEv.EventStart).GetStringIgnoreErrors(utils.OriginID) != initArgs.CGREvent.Event[utils.OriginID] {
t.Errorf("Unexpected event received: %+v", disconnectEv)
}
initArgs.CGREvent.Event[utils.Usage] = disconnectEv.EventStart[utils.Usage]
}
termArgs := &V1TerminateSessionArgs{
TerminateSession: true,
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "TestSessionsDataLastUsedData",
Event: map[string]any{
utils.EventName: "TEST_EVENT",
utils.ToR: utils.MetaVoice,
utils.OriginID: "123451",
utils.AccountField: attrSetBalance.Account,
utils.Subject: attrSetBalance.Account,
utils.Destination: "1004",
utils.Category: "call",
utils.Tenant: attrSetBalance.Tenant,
utils.RequestType: utils.MetaPrepaid,
utils.SetupTime: time.Date(2016, time.January, 5, 18, 30, 59, 0, time.UTC),
utils.AnswerTime: time.Date(2016, time.January, 5, 18, 31, 05, 0, time.UTC),
utils.Usage: initArgs.CGREvent.Event[utils.Usage],
},
},
}
// var initRpl *V1InitSessionReply
// if err := sessionsBiRPC.Call(utils.SessionSv1InitiateSession,
// initArgs, &initRpl); err != nil {
// t.Fatal(err)
// }
// time.Sleep(10 * time.Millisecond) // give some time to allow the session to be created
// expMaxUsage := 3 * time.Hour // MaxCallDuration from config
// if initRpl.MaxUsage == nil || *initRpl.MaxUsage != expMaxUsage {
// t.Errorf("Expecting : %+v, received: %+v", expMaxUsage, initRpl.MaxUsage)
// }
// // Make sure we are receiving a disconnect event
// select {
// case <-time.After(100 * time.Millisecond):
// t.Error("Did not receive disconnect event")
// case disconnectEv := <-disconnectEvChan:
// if engine.NewMapEvent(disconnectEv.EventStart).GetStringIgnoreErrors(utils.OriginID) != initArgs.CGREvent.Event[utils.OriginID] {
// t.Errorf("Unexpected event received: %+v", disconnectEv)
// }
// initArgs.CGREvent.Event[utils.Usage] = disconnectEv.EventStart[utils.Usage]
// }
// termArgs := &V1TerminateSessionArgs{
// TerminateSession: true,
// CGREvent: &utils.CGREvent{
// Tenant: "cgrates.org",
// ID: "TestSessionsDataLastUsedData",
// Event: map[string]any{
// utils.EventName: "TEST_EVENT",
// utils.ToR: utils.MetaVoice,
// utils.OriginID: "123451",
// utils.AccountField: attrSetBalance.Account,
// utils.Subject: attrSetBalance.Account,
// utils.Destination: "1004",
// utils.Category: "call",
// utils.Tenant: attrSetBalance.Tenant,
// utils.RequestType: utils.MetaPrepaid,
// utils.SetupTime: time.Date(2016, time.January, 5, 18, 30, 59, 0, time.UTC),
// utils.AnswerTime: time.Date(2016, time.January, 5, 18, 31, 05, 0, time.UTC),
// utils.Usage: initArgs.CGREvent.Event[utils.Usage],
// },
// },
// }
var rpl string
if err := sessionsBiRPC.Call(utils.SessionSv1TerminateSession, termArgs, &rpl); err != nil || rpl != utils.OK {
t.Error(err)
}
time.Sleep(100 * time.Millisecond) // Give time for debits to occur
if err := sessionsRPC.Call(utils.APIerSv2GetAccount, attrGetAcnt, &acnt); err != nil {
t.Error(err)
} else if acnt.BalanceMap[utils.MetaVoice].GetTotalValue() != 0 {
t.Errorf("Balance should be empty, have: %f", acnt.BalanceMap[utils.MetaVoice].GetTotalValue())
}
if err := sessionsBiRPC.Call(utils.SessionSv1ProcessCDR, termArgs.CGREvent, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Received reply: %s", reply)
}
time.Sleep(100 * time.Millisecond)
var cdrs []*engine.ExternalCDR
req := utils.RPCCDRsFilter{RunIDs: []string{utils.MetaDefault},
DestinationPrefixes: []string{"1004"}}
if err := sessionsRPC.Call(utils.APIerSv2GetCDRs, &req, &cdrs); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(cdrs) != 1 {
t.Error("Unexpected number of CDRs returned: ", len(cdrs))
} else {
if cdrs[0].Usage != "10ms" {
t.Errorf("Unexpected CDR Usage received, cdr: %v %+v ", cdrs[0].Usage, cdrs[0])
} else if cdrs[0].CostSource != utils.MetaSessionS {
t.Errorf("Unexpected CDR CostSource received, cdr: %v %+v ", cdrs[0].CostSource, cdrs[0])
}
}
}
// var rpl string
// if err := sessionsBiRPC.Call(utils.SessionSv1TerminateSession, termArgs, &rpl); err != nil || rpl != utils.OK {
// t.Error(err)
// }
// time.Sleep(100 * time.Millisecond) // Give time for debits to occur
// if err := sessionsRPC.Call(utils.APIerSv2GetAccount, attrGetAcnt, &acnt); err != nil {
// t.Error(err)
// } else if acnt.BalanceMap[utils.MetaVoice].GetTotalValue() != 0 {
// t.Errorf("Balance should be empty, have: %f", acnt.BalanceMap[utils.MetaVoice].GetTotalValue())
// }
// if err := sessionsBiRPC.Call(utils.SessionSv1ProcessCDR, termArgs.CGREvent, &reply); err != nil {
// t.Error(err)
// } else if reply != utils.OK {
// t.Errorf("Received reply: %s", reply)
// }
// time.Sleep(100 * time.Millisecond)
// var cdrs []*engine.ExternalCDR
// req := utils.RPCCDRsFilter{RunIDs: []string{utils.MetaDefault},
// DestinationPrefixes: []string{"1004"}}
// if err := sessionsRPC.Call(utils.APIerSv2GetCDRs, &req, &cdrs); err != nil {
// t.Error("Unexpected error: ", err.Error())
// } else if len(cdrs) != 1 {
// t.Error("Unexpected number of CDRs returned: ", len(cdrs))
// } else {
// if cdrs[0].Usage != "10ms" {
// t.Errorf("Unexpected CDR Usage received, cdr: %v %+v ", cdrs[0].Usage, cdrs[0])
// } else if cdrs[0].CostSource != utils.MetaSessionS {
// t.Errorf("Unexpected CDR CostSource received, cdr: %v %+v ", cdrs[0].CostSource, cdrs[0])
// }
// }
// }
func testSessionsBiRPCSessionOriginatorTerminate(t *testing.T) {
attrSetBalance := utils.AttrSetBalance{
Tenant: "cgrates.org",
Account: "TestSessionsBiRPCSessionOriginatorTerminate",
BalanceType: utils.MetaVoice,
Value: float64(time.Second),
Balance: map[string]any{
utils.ID: "TestSessionsBiRPCSessionOriginatorTerminate",
utils.RatingSubject: "*zero1ms",
},
}
var reply string
if err := sessionsRPC.Call(utils.APIerSv2SetBalance, attrSetBalance, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Received: %s", reply)
}
var acnt *engine.Account
attrGetAcnt := &utils.AttrGetAccount{Tenant: attrSetBalance.Tenant, Account: attrSetBalance.Account}
eAcntVal := 1.0 * float64(time.Second)
if err := sessionsRPC.Call(utils.APIerSv2GetAccount, attrGetAcnt, &acnt); err != nil {
t.Error(err)
} else if acnt.BalanceMap[utils.MetaVoice].GetTotalValue() != eAcntVal {
t.Errorf("Expecting: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.MetaVoice].GetTotalValue())
}
// func testSessionsBiRPCSessionOriginatorTerminate(t *testing.T) {
// attrSetBalance := utils.AttrSetBalance{
// Tenant: "cgrates.org",
// Account: "TestSessionsBiRPCSessionOriginatorTerminate",
// BalanceType: utils.MetaVoice,
// Value: float64(time.Second),
// Balance: map[string]any{
// utils.ID: "TestSessionsBiRPCSessionOriginatorTerminate",
// utils.RatingSubject: "*zero1ms",
// },
// }
// var reply string
// if err := sessionsRPC.Call(utils.APIerSv2SetBalance, attrSetBalance, &reply); err != nil {
// t.Error(err)
// } else if reply != utils.OK {
// t.Errorf("Received: %s", reply)
// }
// var acnt *engine.Account
// attrGetAcnt := &utils.AttrGetAccount{Tenant: attrSetBalance.Tenant, Account: attrSetBalance.Account}
// eAcntVal := 1.0 * float64(time.Second)
// if err := sessionsRPC.Call(utils.APIerSv2GetAccount, attrGetAcnt, &acnt); err != nil {
// t.Error(err)
// } else if acnt.BalanceMap[utils.MetaVoice].GetTotalValue() != eAcntVal {
// t.Errorf("Expecting: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.MetaVoice].GetTotalValue())
// }
initArgs := &V1InitSessionArgs{
InitSession: true,
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "TestSessionsBiRPCSessionOriginatorTerminate",
Event: map[string]any{
utils.EventName: "TEST_EVENT",
utils.ToR: utils.MetaVoice,
utils.OriginID: "123452",
utils.AccountField: attrSetBalance.Account,
utils.Subject: attrSetBalance.Account,
utils.Destination: "1005",
utils.Category: "call",
utils.Tenant: attrSetBalance.Tenant,
utils.RequestType: utils.MetaPrepaid,
utils.SetupTime: time.Date(2016, time.January, 5, 18, 30, 59, 0, time.UTC),
utils.AnswerTime: time.Date(2016, time.January, 5, 18, 31, 05, 0, time.UTC),
utils.Usage: 200 * time.Millisecond,
},
},
}
// initArgs := &V1InitSessionArgs{
// InitSession: true,
// CGREvent: &utils.CGREvent{
// Tenant: "cgrates.org",
// ID: "TestSessionsBiRPCSessionOriginatorTerminate",
// Event: map[string]any{
// utils.EventName: "TEST_EVENT",
// utils.ToR: utils.MetaVoice,
// utils.OriginID: "123452",
// utils.AccountField: attrSetBalance.Account,
// utils.Subject: attrSetBalance.Account,
// utils.Destination: "1005",
// utils.Category: "call",
// utils.Tenant: attrSetBalance.Tenant,
// utils.RequestType: utils.MetaPrepaid,
// utils.SetupTime: time.Date(2016, time.January, 5, 18, 30, 59, 0, time.UTC),
// utils.AnswerTime: time.Date(2016, time.January, 5, 18, 31, 05, 0, time.UTC),
// utils.Usage: 200 * time.Millisecond,
// },
// },
// }
var initRpl *V1InitSessionReply
if err := sessionsBiRPC.Call(utils.SessionSv1InitiateSession,
initArgs, &initRpl); err != nil {
t.Error(err)
}
// var initRpl *V1InitSessionReply
// if err := sessionsBiRPC.Call(utils.SessionSv1InitiateSession,
// initArgs, &initRpl); err != nil {
// t.Error(err)
// }
expMaxUsage := 3 * time.Hour // MaxCallDuration from config
if initRpl.MaxUsage == nil || *initRpl.MaxUsage != expMaxUsage {
t.Errorf("Expecting : %+v, received: %+v", expMaxUsage, initRpl.MaxUsage)
}
// expMaxUsage := 3 * time.Hour // MaxCallDuration from config
// if initRpl.MaxUsage == nil || *initRpl.MaxUsage != expMaxUsage {
// t.Errorf("Expecting : %+v, received: %+v", expMaxUsage, initRpl.MaxUsage)
// }
time.Sleep(10 * time.Millisecond) // Give time for debits to occur
// time.Sleep(10 * time.Millisecond) // Give time for debits to occur
termArgs := &V1TerminateSessionArgs{
TerminateSession: true,
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "TestSessionsBiRPCSessionOriginatorTerminate",
Event: map[string]any{
utils.EventName: "TEST_EVENT",
utils.ToR: utils.MetaVoice,
utils.OriginID: "123452",
utils.AccountField: attrSetBalance.Account,
utils.Subject: attrSetBalance.Account,
utils.Destination: "1005",
utils.Category: "call",
utils.Tenant: attrSetBalance.Tenant,
utils.RequestType: utils.MetaPrepaid,
utils.SetupTime: time.Date(2016, time.January, 5, 18, 30, 59, 0, time.UTC),
utils.AnswerTime: time.Date(2016, time.January, 5, 18, 31, 05, 0, time.UTC),
utils.Usage: 7 * time.Millisecond,
},
},
}
// termArgs := &V1TerminateSessionArgs{
// TerminateSession: true,
// CGREvent: &utils.CGREvent{
// Tenant: "cgrates.org",
// ID: "TestSessionsBiRPCSessionOriginatorTerminate",
// Event: map[string]any{
// utils.EventName: "TEST_EVENT",
// utils.ToR: utils.MetaVoice,
// utils.OriginID: "123452",
// utils.AccountField: attrSetBalance.Account,
// utils.Subject: attrSetBalance.Account,
// utils.Destination: "1005",
// utils.Category: "call",
// utils.Tenant: attrSetBalance.Tenant,
// utils.RequestType: utils.MetaPrepaid,
// utils.SetupTime: time.Date(2016, time.January, 5, 18, 30, 59, 0, time.UTC),
// utils.AnswerTime: time.Date(2016, time.January, 5, 18, 31, 05, 0, time.UTC),
// utils.Usage: 7 * time.Millisecond,
// },
// },
// }
var rpl string
if err := sessionsBiRPC.Call(utils.SessionSv1TerminateSession, termArgs, &rpl); err != nil || rpl != utils.OK {
t.Error(err)
}
// var rpl string
// if err := sessionsBiRPC.Call(utils.SessionSv1TerminateSession, termArgs, &rpl); err != nil || rpl != utils.OK {
// t.Error(err)
// }
time.Sleep(50 * time.Millisecond) // Give time for debits to occur
if err := sessionsRPC.Call(utils.APIerSv2GetAccount, attrGetAcnt, &acnt); err != nil {
t.Error(err)
} else if acnt.BalanceMap[utils.MetaVoice].GetTotalValue() > 0.995*float64(time.Second) { // FixMe: should be not 0.93?
t.Errorf("Balance value: %f", acnt.BalanceMap[utils.MetaVoice].GetTotalValue())
}
// time.Sleep(50 * time.Millisecond) // Give time for debits to occur
// if err := sessionsRPC.Call(utils.APIerSv2GetAccount, attrGetAcnt, &acnt); err != nil {
// t.Error(err)
// } else if acnt.BalanceMap[utils.MetaVoice].GetTotalValue() > 0.995*float64(time.Second) { // FixMe: should be not 0.93?
// t.Errorf("Balance value: %f", acnt.BalanceMap[utils.MetaVoice].GetTotalValue())
// }
if err := sessionsRPC.Call(utils.SessionSv1ProcessCDR, termArgs.CGREvent, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Received reply: %s", reply)
}
time.Sleep(10 * time.Millisecond)
// if err := sessionsRPC.Call(utils.SessionSv1ProcessCDR, termArgs.CGREvent, &reply); err != nil {
// t.Error(err)
// } else if reply != utils.OK {
// t.Errorf("Received reply: %s", reply)
// }
// time.Sleep(10 * time.Millisecond)
var cdrs []*engine.ExternalCDR
req := utils.RPCCDRsFilter{RunIDs: []string{utils.MetaDefault},
DestinationPrefixes: []string{"1005"}}
if err := sessionsRPC.Call(utils.APIerSv2GetCDRs, &req, &cdrs); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(cdrs) != 1 {
t.Error("Unexpected number of CDRs returned: ", len(cdrs))
} else {
if cdrs[0].Usage != "7ms" {
t.Errorf("Unexpected CDR Usage received, cdr: %v %+v ", cdrs[0].Usage, cdrs[0])
} else if cdrs[0].CostSource != utils.MetaSessionS {
t.Errorf("Unexpected CDR CostSource received, cdr: %v %+v ", cdrs[0].CostSource, cdrs[0])
}
}
}
// var cdrs []*engine.ExternalCDR
// req := utils.RPCCDRsFilter{RunIDs: []string{utils.MetaDefault},
// DestinationPrefixes: []string{"1005"}}
// if err := sessionsRPC.Call(utils.APIerSv2GetCDRs, &req, &cdrs); err != nil {
// t.Error("Unexpected error: ", err.Error())
// } else if len(cdrs) != 1 {
// t.Error("Unexpected number of CDRs returned: ", len(cdrs))
// } else {
// if cdrs[0].Usage != "7ms" {
// t.Errorf("Unexpected CDR Usage received, cdr: %v %+v ", cdrs[0].Usage, cdrs[0])
// } else if cdrs[0].CostSource != utils.MetaSessionS {
// t.Errorf("Unexpected CDR CostSource received, cdr: %v %+v ", cdrs[0].CostSource, cdrs[0])
// }
// }
// }
*/
// */
func testSessionsBiRPCStopCgrEngine(t *testing.T) {
if err := sessionsBiRPC.Close(); err != nil { // Close the connection so we don't get EOF warnings from client
t.Error(err)
}
if err := engine.KillEngine(100); err != nil {
t.Error(err)
}
}
// func testSessionsBiRPCStopCgrEngine(t *testing.T) {
// if err := sessionsBiRPC.Close(); err != nil { // Close the connection so we don't get EOF warnings from client
// t.Error(err)
// }
// if err := engine.KillEngine(100); err != nil {
// t.Error(err)
// }
// }

File diff suppressed because it is too large Load Diff

View File

@@ -21,20 +21,19 @@ package utils
import (
"net"
"github.com/cenkalti/rpc2"
rpc2_jsonrpc "github.com/cenkalti/rpc2/jsonrpc"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/jsonrpc"
)
// NewBiJSONrpcClient will create a bidirectional JSON client connection
func NewBiJSONrpcClient(addr string, handlers map[string]any) (*rpc2.Client, error) {
func NewBiJSONrpcClient(addr string, obj birpc.ClientConnector) (*birpc.BirpcClient, error) {
conn, err := net.Dial(TCP, addr)
if err != nil {
return nil, err
}
clnt := rpc2.NewClientWithCodec(rpc2_jsonrpc.NewJSONCodec(conn))
for method, handlerFunc := range handlers {
clnt.Handle(method, handlerFunc)
clnt := birpc.NewBirpcClientWithCodec(jsonrpc.NewJSONBirpcCodec(conn))
if obj != nil {
clnt.Register(obj)
}
go clnt.Run()
return clnt, nil
}

View File

@@ -19,32 +19,19 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package utils
import (
"net"
"testing"
"github.com/cenkalti/rpc2"
)
func TestNewBiJSONrpcClient(t *testing.T) {
//empty check
addr := "127.0.0.1:4024"
handlers := map[string]any{}
rcv, err := NewBiJSONrpcClient(addr, handlers)
if err == nil || rcv != nil {
t.Error("Expencting: \"connection refused\", received : nil")
func TestBiRpcNewBiJSONrpcClient(t *testing.T) {
rcv, err := NewBiJSONrpcClient("test", nil)
if err != nil {
if err.Error() != "dial tcp: address test: missing port in address" {
t.Error(err)
}
}
l, err := net.Listen(TCP, addr)
if err != nil {
if rcv != nil {
t.Error(err)
}
handlers = map[string]any{
"": func(*rpc2.Client, *struct{}, *string) error { return nil },
}
_, err = NewBiJSONrpcClient(addr, handlers)
if err != nil {
t.Error(err)
}
l.Close()
}

View File

@@ -25,7 +25,7 @@ import (
"fmt"
"reflect"
"github.com/cgrates/ugocodec/codec"
"github.com/ugorji/go/codec"
"go.mongodb.org/mongo-driver/bson"
)
@@ -85,11 +85,11 @@ type CodecMsgpackMarshaler struct {
}
func NewCodecMsgpackMarshaler() *CodecMsgpackMarshaler {
cmm := &CodecMsgpackMarshaler{new(codec.MsgpackHandle)}
mh := cmm.mh
mh := new(codec.MsgpackHandle)
mh.MapType = reflect.TypeOf(map[string]any(nil))
mh.RawToString = true
return cmm
// mh.TimeNotBuiltin = true
return &CodecMsgpackMarshaler{mh}
}
func (cmm *CodecMsgpackMarshaler) Marshal(v any) (b []byte, err error) {
@@ -100,7 +100,7 @@ func (cmm *CodecMsgpackMarshaler) Marshal(v any) (b []byte, err error) {
func (cmm *CodecMsgpackMarshaler) Unmarshal(data []byte, v any) error {
dec := codec.NewDecoderBytes(data, cmm.mh)
return dec.Decode(&v)
return dec.Decode(v)
}
type BincMarshaler struct {
@@ -119,7 +119,7 @@ func (bm *BincMarshaler) Marshal(v any) (b []byte, err error) {
func (bm *BincMarshaler) Unmarshal(data []byte, v any) error {
dec := codec.NewDecoderBytes(data, bm.bh)
return dec.Decode(&v)
return dec.Decode(v)
}
type GOBMarshaler struct{}

View File

@@ -19,11 +19,14 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package utils
import (
"bytes"
"encoding/json"
"io"
"reflect"
"testing"
"time"
"github.com/cgrates/ugocodec/codec"
"github.com/ugorji/go/codec"
"go.mongodb.org/mongo-driver/bson"
)
@@ -183,8 +186,8 @@ func TestCodecMsgpackMarshalerUnmarshal(t *testing.T) {
cmm := &CodecMsgpackMarshaler{&codec.MsgpackHandle{}}
data := []byte{116, 101, 100}
v := "testv"
expErr := "[pos 1]: invalid container type: expecting bin|str|array"
if err := cmm.Unmarshal(data, v); err == nil || err.Error() != expErr {
expErr := "msgpack decode error [pos 1]: invalid byte descriptor for decoding bytes, got: 0x74"
if err := cmm.Unmarshal(data, &v); err == nil || err.Error() != expErr {
t.Errorf("Expected error <%v>, Received <%v>", expErr, err)
}
}
@@ -203,9 +206,8 @@ func TestBincMarshalerUnmarshal(t *testing.T) {
bm := &BincMarshaler{&codec.BincHandle{}}
v := "testinterce"
data := []byte{64, 13, 11, 115, 116, 105, 110, 116, 101, 114, 102, 97, 99, 101}
expErr := "unexpected EOF"
if err := bm.Unmarshal(data, v); err == nil || err.Error() != expErr {
t.Errorf("Expected error <%v>, Received <%v>", expErr, err)
if err := bm.Unmarshal(data, &v); err != io.EOF {
t.Errorf("Expected error <%v>, Received <%v>", io.EOF, err)
}
}
@@ -229,3 +231,179 @@ func TestGOBMarshalerUnmarshal(t *testing.T) {
t.Errorf("Expected error <%v>, Received <%v>", expErr, err)
}
}
func TestMarshalerDecodeIntoNilIface(t *testing.T) {
ms := NewCodecMsgpackMarshaler()
mp := map[string]any{
"key1": "value1",
"key2": 2.,
}
expBytes := []byte{130, 164, 107, 101, 121, 49, 166, 118, 97, 108, 117, 101, 49, 164, 107, 101, 121, 50, 203, 64, 0, 0, 0, 0, 0, 0, 0}
b, err := ms.Marshal(mp)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(b, expBytes) {
t.Fatalf("expected: %+v,\nreceived: <%+v>", expBytes, b)
}
decodedMap := make(map[string]any)
err = ms.Unmarshal(b, &decodedMap)
if err != nil {
t.Fatal(err)
}
for key, value := range decodedMap {
if value != mp[key] {
t.Fatalf("for key %s, expected: %+v,\nreceived: %+v",
key, mp[key], value)
}
}
}
func TestMarshalerMsgPackDecode(t *testing.T) {
type stc struct {
Name string
}
type structWithTime struct {
Date time.Time
}
var s stc
mp := make(map[string]any)
var slc []string
var slcB []byte
var arr [1]int
var nm int
var fl float64
var str string
var bl bool
var td time.Duration
var tm time.Time
stcWithTime := structWithTime{}
tests := []struct {
name string
expBytes []byte
val any
decode any
rng bool
}{
{
name: "map",
expBytes: []byte{129, 164, 107, 101, 121, 49, 166, 118, 97, 108, 117, 101, 49},
val: map[string]any{"key1": "value1"},
decode: mp,
rng: true,
},
{
name: "int",
expBytes: []byte{1},
val: 1,
decode: nm,
rng: false,
},
{
name: "string",
expBytes: []byte{164, 116, 101, 115, 116},
val: "test",
decode: str,
rng: false,
},
{
name: "float64",
expBytes: []byte{203, 63, 248, 0, 0, 0, 0, 0, 0},
val: 1.5,
decode: fl,
rng: false,
},
{
name: "boolean",
expBytes: []byte{195},
val: true,
decode: bl,
rng: false,
},
{
name: "slice",
expBytes: []byte{145, 164, 118, 97, 108, 49},
val: []string{"val1"},
decode: slc,
rng: true,
},
{
name: "array",
expBytes: []byte{145, 1},
val: [1]int{1},
decode: arr,
rng: true,
},
{
name: "struct",
expBytes: []byte{129, 164, 78, 97, 109, 101, 164, 116, 101, 115, 116},
val: stc{"test"},
decode: s,
rng: true,
},
{
name: "time duration",
expBytes: []byte{210, 59, 154, 202, 0},
val: 1 * time.Second,
decode: td,
rng: false,
},
{
name: "slice of bytes",
expBytes: []byte{162, 5, 8},
val: []byte{5, 8},
decode: slcB,
rng: true,
},
{
name: "time.Time",
expBytes: []byte{168, 41, 96, 67, 92, 100, 165, 51, 220},
val: time.Date(2023, 7, 5, 9, 11, 56, 173543639, time.UTC), // 2023-07-05 09:11:56.173543639 UTC
decode: tm,
rng: true,
},
{
name: "struct with time.Time",
expBytes: []byte{129, 164, 68, 97, 116, 101, 168, 41, 96, 67, 92, 100, 165, 51, 220},
val: structWithTime{
Date: time.Date(2023, 7, 5, 9, 11, 56, 173543639, time.UTC), // 2023-07-05 09:11:56.173543639 UTC
},
decode: stcWithTime,
rng: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ms := NewCodecMsgpackMarshaler()
b, err := ms.Marshal(tt.val)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(b, tt.expBytes) {
t.Fatalf("expected: %+v,\nreceived: %+v", tt.expBytes, b)
}
err = ms.Unmarshal(b, &tt.decode)
if err != nil {
t.Fatal(err)
}
if tt.rng {
if !reflect.DeepEqual(tt.decode, tt.val) {
t.Errorf("expected %v, received %v", tt.val, tt.decode)
}
} else {
if tt.decode != tt.val {
t.Errorf("expected %v, received %v", tt.val, tt.decode)
}
}
})
}
}