diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 6060e552a..8d6a44c7c 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -19,7 +19,6 @@ along with this program. If not, see package main import ( - "encoding/gob" "flag" "fmt" "log" @@ -157,7 +156,6 @@ func main() { log.Fatalf("Could not connect to history server, error: %s. Make sure you have properly configured it via -history_server flag.", err.Error()) return } else { - gob.Register(engine.Destination{}) engine.SetHistoryScribe(scribeAgent) defer scribeAgent.Client.Close() } diff --git a/engine/calldesc.go b/engine/calldesc.go index 16ce1fd64..817b3ce86 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -96,9 +96,6 @@ func SetDebitPeriod(d time.Duration) { // Exported method to set the history scribe. func SetHistoryScribe(scribe history.Scribe) { - history.RegisterRecordFilename(&Destination{}) - history.RegisterRecordFilename(&RatingPlan{}) - history.RegisterRecordFilename(&RatingProfile{}) historyScribe = scribe } diff --git a/engine/destinations.go b/engine/destinations.go index 819fbc2be..baf4e06fa 100644 --- a/engine/destinations.go +++ b/engine/destinations.go @@ -18,7 +18,12 @@ along with this program. If not, see package engine -import "strings" +import ( + "encoding/json" + "strings" + + "github.com/cgrates/cgrates/history" +) /* Structure that gathers multiple destination prefixes under a common id. @@ -55,6 +60,11 @@ func (d *Destination) AddPrefix(pfx string) { } // history record method -func (d *Destination) GetId() string { - return d.Id +func (d *Destination) GetHistoryRecord() history.Record { + js, _ := json.Marshal(d) + return history.Record{ + Id: d.Id, + Filename: history.DESTINATIONS_FN, + Payload: js, + } } diff --git a/engine/history_test.go b/engine/history_test.go index 19f8a2712..aa599ae6e 100644 --- a/engine/history_test.go +++ b/engine/history_test.go @@ -27,7 +27,7 @@ import ( func TestHistoryRatinPlans(t *testing.T) { scribe := historyScribe.(*history.MockScribe) - buf := scribe.BufMap["ratingprofiles.json"] + buf := scribe.BufMap[history.RATING_PROFILES_FN] if !strings.Contains(buf.String(), `{"Id":"*out:vdf:0:minu","RatingPlanActivations":[{"ActivationTime":"2012-01-01T00:00:00Z","RatingPlanId":"EVENING","FallbackKeys":null}]}`) { t.Error("Error in destination history content:", buf.String()) } @@ -35,7 +35,7 @@ func TestHistoryRatinPlans(t *testing.T) { func TestHistoryDestinations(t *testing.T) { scribe := historyScribe.(*history.MockScribe) - buf := scribe.BufMap["destinations.json"] + buf := scribe.BufMap[history.DESTINATIONS_FN] expected := `[{"Id":"ALL","Prefixes":["49","41","43"]}, {"Id":"GERMANY","Prefixes":["49"]}, {"Id":"GERMANY_O2","Prefixes":["41"]}, diff --git a/engine/ratingplan.go b/engine/ratingplan.go index a33b9a6d5..d0ddf2249 100644 --- a/engine/ratingplan.go +++ b/engine/ratingplan.go @@ -18,6 +18,12 @@ along with this program. If not, see package engine +import ( + "encoding/json" + + "github.com/cgrates/cgrates/history" +) + /* The struture that is saved to storage. */ @@ -99,6 +105,11 @@ func (rp *RatingPlan) Equal(o *RatingPlan) bool { } // history record method -func (rp *RatingPlan) GetId() string { - return rp.Id +func (rp *RatingPlan) GetHistoryRecord() history.Record { + js, _ := json.Marshal(rp) + return history.Record{ + Id: rp.Id, + Filename: history.RATING_PLANS_FN, + Payload: js, + } } diff --git a/engine/ratingprofile.go b/engine/ratingprofile.go index 159e5ea2e..12bd6ea5e 100644 --- a/engine/ratingprofile.go +++ b/engine/ratingprofile.go @@ -19,12 +19,14 @@ along with this program. If not, see package engine import ( + "encoding/json" "errors" "fmt" "sort" "time" "github.com/cgrates/cgrates/cache2go" + "github.com/cgrates/cgrates/history" "github.com/cgrates/cgrates/utils" ) @@ -148,6 +150,11 @@ func (rp *RatingProfile) GetRatingPlansForPrefix(cd *CallDescriptor) (err error) } // history record method -func (rpf *RatingProfile) GetId() string { - return rpf.Id +func (rpf *RatingProfile) GetHistoryRecord() history.Record { + js, _ := json.Marshal(rpf) + return history.Record{ + Id: rpf.Id, + Filename: history.RATING_PROFILES_FN, + Payload: js, + } } diff --git a/engine/storage_map.go b/engine/storage_map.go index 071966b36..78127a4f5 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -122,7 +122,7 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) { result, err := ms.ms.Marshal(rp) ms.dict[RATING_PLAN_PREFIX+rp.Id] = result response := 0 - go historyScribe.Record(rp, &response) + go historyScribe.Record(rp.GetHistoryRecord(), &response) cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp) return } @@ -150,7 +150,7 @@ func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) { result, err := ms.ms.Marshal(rpf) ms.dict[RATING_PROFILE_PREFIX+rpf.Id] = result response := 0 - go historyScribe.Record(rpf, &response) + go historyScribe.Record(rpf.GetHistoryRecord(), &response) cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf) return } @@ -179,7 +179,7 @@ func (ms *MapStorage) SetDestination(dest *Destination) (err error) { result, err := ms.ms.Marshal(dest) ms.dict[DESTINATION_PREFIX+dest.Id] = result response := 0 - go historyScribe.Record(dest, &response) + go historyScribe.Record(dest.GetHistoryRecord(), &response) cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest) return } diff --git a/engine/storage_mongo.go b/engine/storage_mongo.go index 6438ae9dd..f136ad71b 100644 --- a/engine/storage_mongo.go +++ b/engine/storage_mongo.go @@ -22,6 +22,7 @@ import ( "fmt" "log" "time" + "labix.org/v2/mgo" "labix.org/v2/mgo/bson" ) @@ -136,7 +137,7 @@ func (ms *MongoStorage) GetRatingPlan(key string) (rp *RatingPlan, err error) { func (ms *MongoStorage) SetRatingPlan(rp *RatingPlan) error { if historyScribe != nil { response := 0 - historyScribe.Record(rp, &response) + historyScribe.Record(rp.GetHistoryRecord(), &response) } return ms.db.C("ratingplans").Insert(rp) } @@ -150,7 +151,7 @@ func (ms *MongoStorage) GetRatingProfile(key string) (rp *RatingProfile, err err func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile) error { if historyScribe != nil { response := 0 - historyScribe.Record(rp, &response) + historyScribe.Record(rp.GetHistoryRecord(), &response) } return ms.db.C("ratingprofiles").Insert(rp) } @@ -167,7 +168,7 @@ func (ms *MongoStorage) GetDestination(key string) (result *Destination, err err func (ms *MongoStorage) SetDestination(dest *Destination) error { if historyScribe != nil { response := 0 - historyScribe.Record(dest, &response) + historyScribe.Record(dest.GetHistoryRecord(), &response) } return ms.db.C("destinations").Insert(dest) } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 99b2adefb..807d75e17 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -194,7 +194,7 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan) (err error) { err = rs.db.Set(RATING_PLAN_PREFIX+rp.Id, b.Bytes()) if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(rp, &response) + go historyScribe.Record(rp.GetHistoryRecord(), &response) } //cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp) return @@ -222,7 +222,7 @@ func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile) (err error) { err = rs.db.Set(RATING_PROFILE_PREFIX+rpf.Id, result) if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(rpf, &response) + go historyScribe.Record(rpf.GetHistoryRecord(), &response) } //cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf) return @@ -271,7 +271,7 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) { err = rs.db.Set(DESTINATION_PREFIX+dest.Id, b.Bytes()) if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(dest, &response) + go historyScribe.Record(dest.GetHistoryRecord(), &response) } //cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest) return diff --git a/history/file_scribe.go b/history/file_scribe.go index 8dbe175b8..5694a97dc 100644 --- a/history/file_scribe.go +++ b/history/file_scribe.go @@ -50,7 +50,7 @@ func NewFileScribe(fileRoot string, saveInterval time.Duration) (*FileScribe, er s.loopChecker = make(chan int) s.gitInit() - for fn, _ := range recordsMap { + for _, fn := range []string{DESTINATIONS_FN, RATING_PLANS_FN, RATING_PROFILES_FN} { if err := s.load(fn); err != nil { return nil, err } @@ -60,10 +60,8 @@ func NewFileScribe(fileRoot string, saveInterval time.Duration) (*FileScribe, er func (s *FileScribe) Record(rec Record, out *int) error { s.mu.Lock() - fileToSave := GetRFN(rec) - if records, ok := recordsMap[fileToSave]; ok { - records.SetOrAdd(rec) - } + fileToSave := rec.Filename + recordsMap[fileToSave] = recordsMap[fileToSave].SetOrAdd(&rec) // flood protection for save method (do not save on every loop iteration) if s.waitingFile == fileToSave { diff --git a/history/mock_scribe.go b/history/mock_scribe.go index 3524340fd..808298a8e 100644 --- a/history/mock_scribe.go +++ b/history/mock_scribe.go @@ -31,15 +31,15 @@ type MockScribe struct { func NewMockScribe() (*MockScribe, error) { return &MockScribe{BufMap: map[string]*bytes.Buffer{ - "destinations.json": bytes.NewBuffer(nil), - "ratingplans.json": bytes.NewBuffer(nil), - "ratingprofiles.json": bytes.NewBuffer(nil), + DESTINATIONS_FN: bytes.NewBuffer(nil), + RATING_PLANS_FN: bytes.NewBuffer(nil), + RATING_PROFILES_FN: bytes.NewBuffer(nil), }}, nil } func (s *MockScribe) Record(rec Record, out *int) error { - fn := GetRFN(rec) - recordsMap[fn] = recordsMap[fn].SetOrAdd(rec) + fn := rec.Filename + recordsMap[fn] = recordsMap[fn].SetOrAdd(&rec) s.save(fn) return nil } diff --git a/history/proxy_scribe.go b/history/proxy_scribe.go index c7be783ea..0d5b1b3dd 100644 --- a/history/proxy_scribe.go +++ b/history/proxy_scribe.go @@ -18,11 +18,7 @@ along with this program. If not, see package history -import ( - "encoding/gob" - "log" - "net/rpc" -) +import "net/rpc" type ProxyScribe struct { Client *rpc.Client @@ -37,12 +33,6 @@ func NewProxyScribe(addr string) (*ProxyScribe, error) { return &ProxyScribe{Client: client}, nil } -func RRR(r interface{}) { - gob.Register(r) -} - func (ps *ProxyScribe) Record(rec Record, out *int) error { - err := ps.Client.Call("Scribe.Record", &rec, out) - log.Printf("Result for %v: %v", rec, err) - return err + return ps.Client.Call("Scribe.Record", rec, out) } diff --git a/history/scribe.go b/history/scribe.go index 97c8a29b1..e1bb9c3a4 100644 --- a/history/scribe.go +++ b/history/scribe.go @@ -19,22 +19,28 @@ along with this program. If not, see package history import ( - "encoding/json" "io" "reflect" "sort" - "strings" +) + +const ( + DESTINATIONS_FN = "destinations.json" + RATING_PLANS_FN = "rating_plans.json" + RATING_PROFILES_FN = "rating_profiles.json" ) type Scribe interface { Record(Record, *int) error } -type Record interface { - GetId() string +type Record struct { + Id string + Filename string + Payload []byte } -type records []Record +type records []*Record var ( recordsMap = make(map[string]records) @@ -50,18 +56,18 @@ func (rs records) Swap(i, j int) { } func (rs records) Less(i, j int) bool { - return rs[i].GetId() < rs[j].GetId() + return rs[i].Id < rs[j].Id } func (rs records) Sort() { sort.Sort(rs) } -func (rs records) SetOrAdd(rec Record) records { +func (rs records) SetOrAdd(rec *Record) records { //rs.Sort() n := len(rs) - i := sort.Search(n, func(i int) bool { return rs[i].GetId() >= rec.GetId() }) - if i < n && rs[i].GetId() == rec.GetId() { + i := sort.Search(n, func(i int) bool { return rs[i].Id >= rec.Id }) + if i < n && rs[i].Id == rec.Id { rs[i] = rec } else { // i is the index where it would be inserted. @@ -72,30 +78,11 @@ func (rs records) SetOrAdd(rec Record) records { return rs } -/*func (rs records) SetOrAdd(rec Record) records { - found := false - for _, r := range rs { - if r.GetId() == rec.GetId() { - found = true - r.Object = rec.Object - return rs - } - } - if !found { - rs = append(rs, rec) - } - return rs -}*/ - func format(b io.Writer, recs records) error { recs.Sort() b.Write([]byte("[")) for i, r := range recs { - src, err := json.Marshal(r) - if err != nil { - return err - } - b.Write(src) + b.Write(r.Payload) if i < len(recs)-1 { b.Write([]byte(",\n")) } @@ -103,18 +90,3 @@ func format(b io.Writer, recs records) error { b.Write([]byte("]")) return nil } - -func GetRFN(rec Record) string { - if fn, ok := filenameMap[reflect.TypeOf(rec)]; ok { - return fn - } else { - typ := reflect.TypeOf(rec) - typeSegments := strings.Split(typ.String(), ".") - fn = strings.ToLower(typeSegments[len(typeSegments)-1]) + "s.json" - filenameMap[typ] = fn - recordsMap[fn] = make(records, 0) - return fn - } -} - -var RegisterRecordFilename = GetRFN // will create a key in filename and records map diff --git a/history/scribe_test.go b/history/scribe_test.go index 1fa264f34..31011b98f 100644 --- a/history/scribe_test.go +++ b/history/scribe_test.go @@ -23,17 +23,9 @@ import ( "testing" ) -type TestRecord struct { - Id string -} - -func (tr *TestRecord) GetId() string { - return tr.Id -} - func TestHistorySet(t *testing.T) { - rs := records{&TestRecord{"first"}} - second := &TestRecord{"first"} + rs := records{&Record{Id: "first"}} + second := &Record{Id: "first"} rs.SetOrAdd(second) if len(rs) != 1 || rs[0] != second { t.Error("error setting new value: ", rs[0]) @@ -41,8 +33,8 @@ func TestHistorySet(t *testing.T) { } func TestHistoryAdd(t *testing.T) { - rs := records{&TestRecord{"first"}} - second := &TestRecord{"second"} + rs := records{&Record{Id: "first"}} + second := &Record{Id: "second"} rs = rs.SetOrAdd(second) if len(rs) != 2 || rs[1] != second { t.Error("error setting new value: ", rs) @@ -52,9 +44,9 @@ func TestHistoryAdd(t *testing.T) { func BenchmarkSetOrAdd(b *testing.B) { var rs records for i := 0; i < 1000; i++ { - rs = rs.SetOrAdd(&TestRecord{strconv.Itoa(i)}) + rs = rs.SetOrAdd(&Record{Id: strconv.Itoa(i)}) } for i := 0; i < b.N; i++ { - rs.SetOrAdd(&TestRecord{"400"}) + rs.SetOrAdd(&Record{Id: "400"}) } }