diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index bf7cced5b..ac921e780 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -224,7 +224,11 @@ func startHistoryScribe() { scribeAgent = scribeServer } } - engine.SetHistoryScribe(scribeAgent) + if scribeAgent != nil { + engine.SetHistoryScribe(scribeAgent) + } else { + engine.SetHistoryScribe(scribeServer) // if it is nil so be it + } if cfg.HistoryServerEnabled { if cfg.HistoryListen != INTERNAL { diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 8c3305c59..ceba279a3 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -115,9 +115,9 @@ func main() { log.Fatal("Could not connect to history server:" + err.Error()) return } else { - log.Print("HS: ", historyServer) engine.SetHistoryScribe(scribeAgent) gob.Register(&engine.Destination{}) + defer scribeAgent.Client.Close() } } diff --git a/engine/calldesc.go b/engine/calldesc.go index db12794c1..5d1960e9d 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -55,7 +55,8 @@ var ( debitPeriod = 10 * time.Second roundingMethod = "*middle" roundingDecimals = 4 - historyScribe, _ = history.NewMockScribe() + historyScribe history.Scribe + //historyScribe, _ = history.NewMockScribe() ) // Exported method to set the storage getter. diff --git a/engine/calldesc_test.go b/engine/calldesc_test.go index f9d43f040..b0edea35d 100644 --- a/engine/calldesc_test.go +++ b/engine/calldesc_test.go @@ -19,6 +19,7 @@ along with this program. If not, see package engine import ( + "github.com/cgrates/cgrates/history" "log" "testing" "time" @@ -26,6 +27,7 @@ import ( func init() { populateDB() + historyScribe, _ = history.NewMockScribe() } func populateDB() { diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 25ce354a9..a1c41cf10 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -83,7 +83,7 @@ func (rs *RedisStorage) SetRatingProfile(rp *RatingProfile) (err error) { _, err = rs.db.Set(RATING_PROFILE_PREFIX+rp.Id, result) if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(&history.Record{RATING_PROFILE_PREFIX + rp.Id, rp}, &response) + historyScribe.Record(&history.Record{RATING_PROFILE_PREFIX + rp.Id, rp}, &response) } return } @@ -105,7 +105,7 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) { _, err = rs.db.Set(DESTINATION_PREFIX+dest.Id, result) if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(&history.Record{DESTINATION_PREFIX + dest.Id, dest}, &response) + historyScribe.Record(&history.Record{DESTINATION_PREFIX + dest.Id, dest}, &response) } return } diff --git a/history/file_scribe.go b/history/file_scribe.go index 171b3b0c1..c0f3c08f6 100644 --- a/history/file_scribe.go +++ b/history/file_scribe.go @@ -28,6 +28,7 @@ import ( "path/filepath" "strings" "sync" + "time" ) const ( @@ -41,15 +42,18 @@ type FileScribe struct { gitCommand string destinations records ratingProfiles records + loopChecker chan int + waitingFile string } -func NewFileScribe(fileRoot string) (Scribe, error) { +func NewFileScribe(fileRoot string) (*FileScribe, error) { // looking for git gitCommand, err := exec.LookPath("git") if err != nil { return nil, errors.New("Please install git: " + err.Error()) } s := &FileScribe{fileRoot: fileRoot, gitCommand: gitCommand} + s.loopChecker = make(chan int) s.gitInit() if err := s.load(DESTINATIONS_FILE); err != nil { return nil, err @@ -63,14 +67,38 @@ func NewFileScribe(fileRoot string) (Scribe, error) { func (s *FileScribe) Record(rec *Record, out *int) error { s.mu.Lock() defer s.mu.Unlock() + var fileToSave string switch { case strings.HasPrefix(rec.Key, DESTINATION_PREFIX): s.destinations = s.destinations.SetOrAdd(&Record{rec.Key[len(DESTINATION_PREFIX):], rec.Object}) - s.save(DESTINATIONS_FILE) + fileToSave = DESTINATIONS_FILE case strings.HasPrefix(rec.Key, RATING_PROFILE_PREFIX): s.ratingProfiles = s.ratingProfiles.SetOrAdd(&Record{rec.Key[len(RATING_PROFILE_PREFIX):], rec.Object}) - s.save(RATING_PROFILES_FILE) + fileToSave = RATING_PROFILES_FILE } + + // flood protection for save method (do not save on every loop iteration) + if s.waitingFile == fileToSave { + s.loopChecker <- 1 + } + s.waitingFile = fileToSave + go func() { + t := time.NewTicker(1 * time.Second) + select { + case <-s.loopChecker: + // cancel saving + case <-t.C: + if fileToSave != "" { + s.save(fileToSave) + } + t.Stop() + s.waitingFile = "" + } + }() + // no protection variant + /*if fileToSave != "" { + s.save(fileToSave) + }*/ *out = 0 return nil } diff --git a/history/mock_scribe.go b/history/mock_scribe.go index 8f3c07592..152fce446 100644 --- a/history/mock_scribe.go +++ b/history/mock_scribe.go @@ -35,7 +35,7 @@ type MockScribe struct { RpBuf bytes.Buffer } -func NewMockScribe() (Scribe, error) { +func NewMockScribe() (*MockScribe, error) { return &MockScribe{}, nil } diff --git a/history/proxy_scribe.go b/history/proxy_scribe.go index 9514eaa90..b6e324827 100644 --- a/history/proxy_scribe.go +++ b/history/proxy_scribe.go @@ -30,10 +30,10 @@ const ( ) type ProxyScribe struct { - client *rpc.Client + Client *rpc.Client } -func NewProxyScribe(addr, encoding string) (Scribe, error) { +func NewProxyScribe(addr, encoding string) (*ProxyScribe, error) { var client *rpc.Client var err error switch encoding { @@ -48,9 +48,9 @@ func NewProxyScribe(addr, encoding string) (Scribe, error) { if err != nil { return nil, err } - return &ProxyScribe{client: client}, nil + return &ProxyScribe{Client: client}, nil } func (ps *ProxyScribe) Record(rec *Record, out *int) error { - return ps.client.Call("Scribe.Record", rec, out) + return ps.Client.Call("Scribe.Record", rec, out) }