working variant for history server and gob enc

This commit is contained in:
Radu Ioan Fericean
2014-01-25 11:48:35 +02:00
parent 8177e64f8b
commit 3bbb67c72f
14 changed files with 79 additions and 103 deletions

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package main
import (
"encoding/gob"
"flag"
"fmt"
"log"
@@ -157,7 +156,6 @@ func main() {
log.Fatalf("Could not connect to history server, error: %s. Make sure you have properly configured it via -history_server flag.", err.Error())
return
} else {
gob.Register(engine.Destination{})
engine.SetHistoryScribe(scribeAgent)
defer scribeAgent.Client.Close()
}

View File

@@ -96,9 +96,6 @@ func SetDebitPeriod(d time.Duration) {
// Exported method to set the history scribe.
func SetHistoryScribe(scribe history.Scribe) {
history.RegisterRecordFilename(&Destination{})
history.RegisterRecordFilename(&RatingPlan{})
history.RegisterRecordFilename(&RatingProfile{})
historyScribe = scribe
}

View File

@@ -18,7 +18,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import "strings"
import (
"encoding/json"
"strings"
"github.com/cgrates/cgrates/history"
)
/*
Structure that gathers multiple destination prefixes under a common id.
@@ -55,6 +60,11 @@ func (d *Destination) AddPrefix(pfx string) {
}
// history record method
func (d *Destination) GetId() string {
return d.Id
func (d *Destination) GetHistoryRecord() history.Record {
js, _ := json.Marshal(d)
return history.Record{
Id: d.Id,
Filename: history.DESTINATIONS_FN,
Payload: js,
}
}

View File

@@ -27,7 +27,7 @@ import (
func TestHistoryRatinPlans(t *testing.T) {
scribe := historyScribe.(*history.MockScribe)
buf := scribe.BufMap["ratingprofiles.json"]
buf := scribe.BufMap[history.RATING_PROFILES_FN]
if !strings.Contains(buf.String(), `{"Id":"*out:vdf:0:minu","RatingPlanActivations":[{"ActivationTime":"2012-01-01T00:00:00Z","RatingPlanId":"EVENING","FallbackKeys":null}]}`) {
t.Error("Error in destination history content:", buf.String())
}
@@ -35,7 +35,7 @@ func TestHistoryRatinPlans(t *testing.T) {
func TestHistoryDestinations(t *testing.T) {
scribe := historyScribe.(*history.MockScribe)
buf := scribe.BufMap["destinations.json"]
buf := scribe.BufMap[history.DESTINATIONS_FN]
expected := `[{"Id":"ALL","Prefixes":["49","41","43"]},
{"Id":"GERMANY","Prefixes":["49"]},
{"Id":"GERMANY_O2","Prefixes":["41"]},

View File

@@ -18,6 +18,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"encoding/json"
"github.com/cgrates/cgrates/history"
)
/*
The struture that is saved to storage.
*/
@@ -99,6 +105,11 @@ func (rp *RatingPlan) Equal(o *RatingPlan) bool {
}
// history record method
func (rp *RatingPlan) GetId() string {
return rp.Id
func (rp *RatingPlan) GetHistoryRecord() history.Record {
js, _ := json.Marshal(rp)
return history.Record{
Id: rp.Id,
Filename: history.RATING_PLANS_FN,
Payload: js,
}
}

View File

@@ -19,12 +19,14 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"encoding/json"
"errors"
"fmt"
"sort"
"time"
"github.com/cgrates/cgrates/cache2go"
"github.com/cgrates/cgrates/history"
"github.com/cgrates/cgrates/utils"
)
@@ -148,6 +150,11 @@ func (rp *RatingProfile) GetRatingPlansForPrefix(cd *CallDescriptor) (err error)
}
// history record method
func (rpf *RatingProfile) GetId() string {
return rpf.Id
func (rpf *RatingProfile) GetHistoryRecord() history.Record {
js, _ := json.Marshal(rpf)
return history.Record{
Id: rpf.Id,
Filename: history.RATING_PROFILES_FN,
Payload: js,
}
}

View File

@@ -122,7 +122,7 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) {
result, err := ms.ms.Marshal(rp)
ms.dict[RATING_PLAN_PREFIX+rp.Id] = result
response := 0
go historyScribe.Record(rp, &response)
go historyScribe.Record(rp.GetHistoryRecord(), &response)
cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp)
return
}
@@ -150,7 +150,7 @@ func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
result, err := ms.ms.Marshal(rpf)
ms.dict[RATING_PROFILE_PREFIX+rpf.Id] = result
response := 0
go historyScribe.Record(rpf, &response)
go historyScribe.Record(rpf.GetHistoryRecord(), &response)
cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf)
return
}
@@ -179,7 +179,7 @@ func (ms *MapStorage) SetDestination(dest *Destination) (err error) {
result, err := ms.ms.Marshal(dest)
ms.dict[DESTINATION_PREFIX+dest.Id] = result
response := 0
go historyScribe.Record(dest, &response)
go historyScribe.Record(dest.GetHistoryRecord(), &response)
cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest)
return
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"log"
"time"
"labix.org/v2/mgo"
"labix.org/v2/mgo/bson"
)
@@ -136,7 +137,7 @@ func (ms *MongoStorage) GetRatingPlan(key string) (rp *RatingPlan, err error) {
func (ms *MongoStorage) SetRatingPlan(rp *RatingPlan) error {
if historyScribe != nil {
response := 0
historyScribe.Record(rp, &response)
historyScribe.Record(rp.GetHistoryRecord(), &response)
}
return ms.db.C("ratingplans").Insert(rp)
}
@@ -150,7 +151,7 @@ func (ms *MongoStorage) GetRatingProfile(key string) (rp *RatingProfile, err err
func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile) error {
if historyScribe != nil {
response := 0
historyScribe.Record(rp, &response)
historyScribe.Record(rp.GetHistoryRecord(), &response)
}
return ms.db.C("ratingprofiles").Insert(rp)
}
@@ -167,7 +168,7 @@ func (ms *MongoStorage) GetDestination(key string) (result *Destination, err err
func (ms *MongoStorage) SetDestination(dest *Destination) error {
if historyScribe != nil {
response := 0
historyScribe.Record(dest, &response)
historyScribe.Record(dest.GetHistoryRecord(), &response)
}
return ms.db.C("destinations").Insert(dest)
}

View File

@@ -194,7 +194,7 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan) (err error) {
err = rs.db.Set(RATING_PLAN_PREFIX+rp.Id, b.Bytes())
if err == nil && historyScribe != nil {
response := 0
go historyScribe.Record(rp, &response)
go historyScribe.Record(rp.GetHistoryRecord(), &response)
}
//cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp)
return
@@ -222,7 +222,7 @@ func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
err = rs.db.Set(RATING_PROFILE_PREFIX+rpf.Id, result)
if err == nil && historyScribe != nil {
response := 0
go historyScribe.Record(rpf, &response)
go historyScribe.Record(rpf.GetHistoryRecord(), &response)
}
//cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf)
return
@@ -271,7 +271,7 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) {
err = rs.db.Set(DESTINATION_PREFIX+dest.Id, b.Bytes())
if err == nil && historyScribe != nil {
response := 0
go historyScribe.Record(dest, &response)
go historyScribe.Record(dest.GetHistoryRecord(), &response)
}
//cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest)
return

View File

@@ -50,7 +50,7 @@ func NewFileScribe(fileRoot string, saveInterval time.Duration) (*FileScribe, er
s.loopChecker = make(chan int)
s.gitInit()
for fn, _ := range recordsMap {
for _, fn := range []string{DESTINATIONS_FN, RATING_PLANS_FN, RATING_PROFILES_FN} {
if err := s.load(fn); err != nil {
return nil, err
}
@@ -60,10 +60,8 @@ func NewFileScribe(fileRoot string, saveInterval time.Duration) (*FileScribe, er
func (s *FileScribe) Record(rec Record, out *int) error {
s.mu.Lock()
fileToSave := GetRFN(rec)
if records, ok := recordsMap[fileToSave]; ok {
records.SetOrAdd(rec)
}
fileToSave := rec.Filename
recordsMap[fileToSave] = recordsMap[fileToSave].SetOrAdd(&rec)
// flood protection for save method (do not save on every loop iteration)
if s.waitingFile == fileToSave {

View File

@@ -31,15 +31,15 @@ type MockScribe struct {
func NewMockScribe() (*MockScribe, error) {
return &MockScribe{BufMap: map[string]*bytes.Buffer{
"destinations.json": bytes.NewBuffer(nil),
"ratingplans.json": bytes.NewBuffer(nil),
"ratingprofiles.json": bytes.NewBuffer(nil),
DESTINATIONS_FN: bytes.NewBuffer(nil),
RATING_PLANS_FN: bytes.NewBuffer(nil),
RATING_PROFILES_FN: bytes.NewBuffer(nil),
}}, nil
}
func (s *MockScribe) Record(rec Record, out *int) error {
fn := GetRFN(rec)
recordsMap[fn] = recordsMap[fn].SetOrAdd(rec)
fn := rec.Filename
recordsMap[fn] = recordsMap[fn].SetOrAdd(&rec)
s.save(fn)
return nil
}

View File

@@ -18,11 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package history
import (
"encoding/gob"
"log"
"net/rpc"
)
import "net/rpc"
type ProxyScribe struct {
Client *rpc.Client
@@ -37,12 +33,6 @@ func NewProxyScribe(addr string) (*ProxyScribe, error) {
return &ProxyScribe{Client: client}, nil
}
func RRR(r interface{}) {
gob.Register(r)
}
func (ps *ProxyScribe) Record(rec Record, out *int) error {
err := ps.Client.Call("Scribe.Record", &rec, out)
log.Printf("Result for %v: %v", rec, err)
return err
return ps.Client.Call("Scribe.Record", rec, out)
}

View File

@@ -19,22 +19,28 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package history
import (
"encoding/json"
"io"
"reflect"
"sort"
"strings"
)
const (
DESTINATIONS_FN = "destinations.json"
RATING_PLANS_FN = "rating_plans.json"
RATING_PROFILES_FN = "rating_profiles.json"
)
type Scribe interface {
Record(Record, *int) error
}
type Record interface {
GetId() string
type Record struct {
Id string
Filename string
Payload []byte
}
type records []Record
type records []*Record
var (
recordsMap = make(map[string]records)
@@ -50,18 +56,18 @@ func (rs records) Swap(i, j int) {
}
func (rs records) Less(i, j int) bool {
return rs[i].GetId() < rs[j].GetId()
return rs[i].Id < rs[j].Id
}
func (rs records) Sort() {
sort.Sort(rs)
}
func (rs records) SetOrAdd(rec Record) records {
func (rs records) SetOrAdd(rec *Record) records {
//rs.Sort()
n := len(rs)
i := sort.Search(n, func(i int) bool { return rs[i].GetId() >= rec.GetId() })
if i < n && rs[i].GetId() == rec.GetId() {
i := sort.Search(n, func(i int) bool { return rs[i].Id >= rec.Id })
if i < n && rs[i].Id == rec.Id {
rs[i] = rec
} else {
// i is the index where it would be inserted.
@@ -72,30 +78,11 @@ func (rs records) SetOrAdd(rec Record) records {
return rs
}
/*func (rs records) SetOrAdd(rec Record) records {
found := false
for _, r := range rs {
if r.GetId() == rec.GetId() {
found = true
r.Object = rec.Object
return rs
}
}
if !found {
rs = append(rs, rec)
}
return rs
}*/
func format(b io.Writer, recs records) error {
recs.Sort()
b.Write([]byte("["))
for i, r := range recs {
src, err := json.Marshal(r)
if err != nil {
return err
}
b.Write(src)
b.Write(r.Payload)
if i < len(recs)-1 {
b.Write([]byte(",\n"))
}
@@ -103,18 +90,3 @@ func format(b io.Writer, recs records) error {
b.Write([]byte("]"))
return nil
}
func GetRFN(rec Record) string {
if fn, ok := filenameMap[reflect.TypeOf(rec)]; ok {
return fn
} else {
typ := reflect.TypeOf(rec)
typeSegments := strings.Split(typ.String(), ".")
fn = strings.ToLower(typeSegments[len(typeSegments)-1]) + "s.json"
filenameMap[typ] = fn
recordsMap[fn] = make(records, 0)
return fn
}
}
var RegisterRecordFilename = GetRFN // will create a key in filename and records map

View File

@@ -23,17 +23,9 @@ import (
"testing"
)
type TestRecord struct {
Id string
}
func (tr *TestRecord) GetId() string {
return tr.Id
}
func TestHistorySet(t *testing.T) {
rs := records{&TestRecord{"first"}}
second := &TestRecord{"first"}
rs := records{&Record{Id: "first"}}
second := &Record{Id: "first"}
rs.SetOrAdd(second)
if len(rs) != 1 || rs[0] != second {
t.Error("error setting new value: ", rs[0])
@@ -41,8 +33,8 @@ func TestHistorySet(t *testing.T) {
}
func TestHistoryAdd(t *testing.T) {
rs := records{&TestRecord{"first"}}
second := &TestRecord{"second"}
rs := records{&Record{Id: "first"}}
second := &Record{Id: "second"}
rs = rs.SetOrAdd(second)
if len(rs) != 2 || rs[1] != second {
t.Error("error setting new value: ", rs)
@@ -52,9 +44,9 @@ func TestHistoryAdd(t *testing.T) {
func BenchmarkSetOrAdd(b *testing.B) {
var rs records
for i := 0; i < 1000; i++ {
rs = rs.SetOrAdd(&TestRecord{strconv.Itoa(i)})
rs = rs.SetOrAdd(&Record{Id: strconv.Itoa(i)})
}
for i := 0; i < b.N; i++ {
rs.SetOrAdd(&TestRecord{"400"})
rs.SetOrAdd(&Record{Id: "400"})
}
}