From 4479eef14dd65ba5f8a5a15c34f54c703ca16445 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 14 May 2017 14:08:58 +0200 Subject: [PATCH] Adding SMGenericV1.SetGZIPpedPassiveSessions using GZIP compressed arguments --- apier/v1/smgenericbirpcv1.go | 2 +- apier/v1/smgenericv1.go | 4 ++++ sessionmanager/smgeneric.go | 13 +++++++++++++ utils/coreutils.go | 27 +++++++++++++++++++++++++++ utils/coreutils_test.go | 13 +++++++++++++ 5 files changed, 58 insertions(+), 1 deletion(-) diff --git a/apier/v1/smgenericbirpcv1.go b/apier/v1/smgenericbirpcv1.go index 7c13b2c15..4c53eaf31 100644 --- a/apier/v1/smgenericbirpcv1.go +++ b/apier/v1/smgenericbirpcv1.go @@ -103,6 +103,6 @@ func (self *SMGenericBiRpcV1) ReplicateActiveSessions(clnt *rpc2.Client, args se return self.sm.BiRPCV1ReplicateActiveSessions(clnt, args, reply) } -func (self *SMGenericBiRpcV1) BiRPCV1ReplicatePassiveSessions(clnt *rpc2.Client, args sessionmanager.ArgsReplicateSessions, reply *string) error { +func (self *SMGenericBiRpcV1) ReplicatePassiveSessions(clnt *rpc2.Client, args sessionmanager.ArgsReplicateSessions, reply *string) error { return self.sm.BiRPCV1ReplicateActiveSessions(clnt, args, reply) } diff --git a/apier/v1/smgenericv1.go b/apier/v1/smgenericv1.go index 29607b1d5..9a6aff681 100644 --- a/apier/v1/smgenericv1.go +++ b/apier/v1/smgenericv1.go @@ -90,6 +90,10 @@ func (self *SMGenericV1) SetPassiveSessions(args sessionmanager.ArgsSetPassiveSe return self.sm.BiRPCV1SetPassiveSessions(nil, args, reply) } +func (self *SMGenericV1) SetGZIPpedPassiveSessions(args []byte, reply *string) error { + return self.sm.BiRPCV1SetGZIPpedPassiveSessions(nil, args, reply) +} + func (self *SMGenericV1) ReplicateActiveSessions(args sessionmanager.ArgsReplicateSessions, reply *string) error { return self.sm.BiRPCV1ReplicateActiveSessions(nil, args, reply) } diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 2e450dcb9..24d0ef0df 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -18,6 +18,7 @@ along with this program. If not, see package sessionmanager import ( + "encoding/json" "errors" "fmt" "reflect" @@ -1177,6 +1178,18 @@ func (smg *SMGeneric) BiRPCV1SetPassiveSessions(clnt rpcclient.RpcClientConnecti return } +// BiRPCV1SetGZIPpedPassiveSessions is used to handle GZIP compressed arguments to BiRPCV1SetPassiveSessions +// eg: if CallCosts are too big, sending them over network could introduce latency +func (smg *SMGeneric) BiRPCV1SetGZIPpedPassiveSessions(clnt rpcclient.RpcClientConnection, args []byte, reply *string) (err error) { + var argsSetPSS ArgsSetPassiveSessions + if dst, err := utils.GUnZIPContent(args); err != nil { + return err + } else if err := json.Unmarshal(dst, &argsSetPSS); err != nil { + return err + } + return smg.BiRPCV1SetPassiveSessions(clnt, argsSetPSS, reply) +} + type ArgsReplicateSessions struct { Filter map[string]string Connections []*config.HaPoolConfig diff --git a/utils/coreutils.go b/utils/coreutils.go index b628addcc..bd3f947df 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -20,6 +20,7 @@ package utils import ( "archive/zip" "bytes" + "compress/gzip" "crypto/rand" "crypto/sha1" "encoding/gob" @@ -27,6 +28,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "log" "math" "os" @@ -361,6 +363,31 @@ func Unzip(src, dest string) error { return nil } +// ZIPContent compresses src into zipped slice of bytes +func GZIPContent(src []byte) (dst []byte, err error) { + var b bytes.Buffer + gz := gzip.NewWriter(&b) + if _, err = gz.Write(src); err != nil { + return + } + if err = gz.Flush(); err != nil { + return + } + if err = gz.Close(); err != nil { + return + } + return b.Bytes(), nil +} + +func GUnZIPContent(src []byte) (dst []byte, err error) { + rdata := bytes.NewReader(src) + var r *gzip.Reader + if r, err = gzip.NewReader(rdata); err != nil { + return + } + return ioutil.ReadAll(r) +} + // successive Fibonacci numbers. func Fib() func() int { a, b := 0, 1 diff --git a/utils/coreutils_test.go b/utils/coreutils_test.go index e7b87737a..fb3d89729 100644 --- a/utils/coreutils_test.go +++ b/utils/coreutils_test.go @@ -802,3 +802,16 @@ func TestCounterConcurrent(t *testing.T) { t.Error("Counter was not reseted to 0") } } + +func TestGZIPGUnZIP(t *testing.T) { + src := []byte("CGRateS.org") + gzipped, err := GZIPContent(src) + if err != nil { + t.Fatal(err) + } + if dst, err := GUnZIPContent(gzipped); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(src, dst) { + t.Error("not matching initial source") + } +}