Adding SMGenericV1.SetGZIPpedPassiveSessions using GZIP compressed arguments

This commit is contained in:
DanB
2017-05-14 14:08:58 +02:00
parent 6faeee47dd
commit 4479eef14d
5 changed files with 58 additions and 1 deletions

View File

@@ -103,6 +103,6 @@ func (self *SMGenericBiRpcV1) ReplicateActiveSessions(clnt *rpc2.Client, args se
return self.sm.BiRPCV1ReplicateActiveSessions(clnt, args, reply)
}
func (self *SMGenericBiRpcV1) BiRPCV1ReplicatePassiveSessions(clnt *rpc2.Client, args sessionmanager.ArgsReplicateSessions, reply *string) error {
func (self *SMGenericBiRpcV1) ReplicatePassiveSessions(clnt *rpc2.Client, args sessionmanager.ArgsReplicateSessions, reply *string) error {
return self.sm.BiRPCV1ReplicateActiveSessions(clnt, args, reply)
}

View File

@@ -90,6 +90,10 @@ func (self *SMGenericV1) SetPassiveSessions(args sessionmanager.ArgsSetPassiveSe
return self.sm.BiRPCV1SetPassiveSessions(nil, args, reply)
}
func (self *SMGenericV1) SetGZIPpedPassiveSessions(args []byte, reply *string) error {
return self.sm.BiRPCV1SetGZIPpedPassiveSessions(nil, args, reply)
}
func (self *SMGenericV1) ReplicateActiveSessions(args sessionmanager.ArgsReplicateSessions, reply *string) error {
return self.sm.BiRPCV1ReplicateActiveSessions(nil, args, reply)
}

View File

@@ -18,6 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package sessionmanager
import (
"encoding/json"
"errors"
"fmt"
"reflect"
@@ -1177,6 +1178,18 @@ func (smg *SMGeneric) BiRPCV1SetPassiveSessions(clnt rpcclient.RpcClientConnecti
return
}
// BiRPCV1SetGZIPpedPassiveSessions is used to handle GZIP compressed arguments to BiRPCV1SetPassiveSessions
// eg: if CallCosts are too big, sending them over network could introduce latency
func (smg *SMGeneric) BiRPCV1SetGZIPpedPassiveSessions(clnt rpcclient.RpcClientConnection, args []byte, reply *string) (err error) {
var argsSetPSS ArgsSetPassiveSessions
if dst, err := utils.GUnZIPContent(args); err != nil {
return err
} else if err := json.Unmarshal(dst, &argsSetPSS); err != nil {
return err
}
return smg.BiRPCV1SetPassiveSessions(clnt, argsSetPSS, reply)
}
type ArgsReplicateSessions struct {
Filter map[string]string
Connections []*config.HaPoolConfig

View File

@@ -20,6 +20,7 @@ package utils
import (
"archive/zip"
"bytes"
"compress/gzip"
"crypto/rand"
"crypto/sha1"
"encoding/gob"
@@ -27,6 +28,7 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"os"
@@ -361,6 +363,31 @@ func Unzip(src, dest string) error {
return nil
}
// ZIPContent compresses src into zipped slice of bytes
func GZIPContent(src []byte) (dst []byte, err error) {
var b bytes.Buffer
gz := gzip.NewWriter(&b)
if _, err = gz.Write(src); err != nil {
return
}
if err = gz.Flush(); err != nil {
return
}
if err = gz.Close(); err != nil {
return
}
return b.Bytes(), nil
}
func GUnZIPContent(src []byte) (dst []byte, err error) {
rdata := bytes.NewReader(src)
var r *gzip.Reader
if r, err = gzip.NewReader(rdata); err != nil {
return
}
return ioutil.ReadAll(r)
}
// successive Fibonacci numbers.
func Fib() func() int {
a, b := 0, 1

View File

@@ -802,3 +802,16 @@ func TestCounterConcurrent(t *testing.T) {
t.Error("Counter was not reseted to 0")
}
}
func TestGZIPGUnZIP(t *testing.T) {
src := []byte("CGRateS.org")
gzipped, err := GZIPContent(src)
if err != nil {
t.Fatal(err)
}
if dst, err := GUnZIPContent(gzipped); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(src, dst) {
t.Error("not matching initial source")
}
}