mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
history flood/loop protection
This commit is contained in:
@@ -224,7 +224,11 @@ func startHistoryScribe() {
|
||||
scribeAgent = scribeServer
|
||||
}
|
||||
}
|
||||
engine.SetHistoryScribe(scribeAgent)
|
||||
if scribeAgent != nil {
|
||||
engine.SetHistoryScribe(scribeAgent)
|
||||
} else {
|
||||
engine.SetHistoryScribe(scribeServer) // if it is nil so be it
|
||||
}
|
||||
|
||||
if cfg.HistoryServerEnabled {
|
||||
if cfg.HistoryListen != INTERNAL {
|
||||
|
||||
@@ -115,9 +115,9 @@ func main() {
|
||||
log.Fatal("Could not connect to history server:" + err.Error())
|
||||
return
|
||||
} else {
|
||||
log.Print("HS: ", historyServer)
|
||||
engine.SetHistoryScribe(scribeAgent)
|
||||
gob.Register(&engine.Destination{})
|
||||
defer scribeAgent.Client.Close()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -55,7 +55,8 @@ var (
|
||||
debitPeriod = 10 * time.Second
|
||||
roundingMethod = "*middle"
|
||||
roundingDecimals = 4
|
||||
historyScribe, _ = history.NewMockScribe()
|
||||
historyScribe history.Scribe
|
||||
//historyScribe, _ = history.NewMockScribe()
|
||||
)
|
||||
|
||||
// Exported method to set the storage getter.
|
||||
|
||||
@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package engine
|
||||
|
||||
import (
|
||||
"github.com/cgrates/cgrates/history"
|
||||
"log"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -26,6 +27,7 @@ import (
|
||||
|
||||
func init() {
|
||||
populateDB()
|
||||
historyScribe, _ = history.NewMockScribe()
|
||||
}
|
||||
|
||||
func populateDB() {
|
||||
|
||||
@@ -83,7 +83,7 @@ func (rs *RedisStorage) SetRatingProfile(rp *RatingProfile) (err error) {
|
||||
_, err = rs.db.Set(RATING_PROFILE_PREFIX+rp.Id, result)
|
||||
if err == nil && historyScribe != nil {
|
||||
response := 0
|
||||
go historyScribe.Record(&history.Record{RATING_PROFILE_PREFIX + rp.Id, rp}, &response)
|
||||
historyScribe.Record(&history.Record{RATING_PROFILE_PREFIX + rp.Id, rp}, &response)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -105,7 +105,7 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) {
|
||||
_, err = rs.db.Set(DESTINATION_PREFIX+dest.Id, result)
|
||||
if err == nil && historyScribe != nil {
|
||||
response := 0
|
||||
go historyScribe.Record(&history.Record{DESTINATION_PREFIX + dest.Id, dest}, &response)
|
||||
historyScribe.Record(&history.Record{DESTINATION_PREFIX + dest.Id, dest}, &response)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -41,15 +42,18 @@ type FileScribe struct {
|
||||
gitCommand string
|
||||
destinations records
|
||||
ratingProfiles records
|
||||
loopChecker chan int
|
||||
waitingFile string
|
||||
}
|
||||
|
||||
func NewFileScribe(fileRoot string) (Scribe, error) {
|
||||
func NewFileScribe(fileRoot string) (*FileScribe, error) {
|
||||
// looking for git
|
||||
gitCommand, err := exec.LookPath("git")
|
||||
if err != nil {
|
||||
return nil, errors.New("Please install git: " + err.Error())
|
||||
}
|
||||
s := &FileScribe{fileRoot: fileRoot, gitCommand: gitCommand}
|
||||
s.loopChecker = make(chan int)
|
||||
s.gitInit()
|
||||
if err := s.load(DESTINATIONS_FILE); err != nil {
|
||||
return nil, err
|
||||
@@ -63,14 +67,38 @@ func NewFileScribe(fileRoot string) (Scribe, error) {
|
||||
func (s *FileScribe) Record(rec *Record, out *int) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
var fileToSave string
|
||||
switch {
|
||||
case strings.HasPrefix(rec.Key, DESTINATION_PREFIX):
|
||||
s.destinations = s.destinations.SetOrAdd(&Record{rec.Key[len(DESTINATION_PREFIX):], rec.Object})
|
||||
s.save(DESTINATIONS_FILE)
|
||||
fileToSave = DESTINATIONS_FILE
|
||||
case strings.HasPrefix(rec.Key, RATING_PROFILE_PREFIX):
|
||||
s.ratingProfiles = s.ratingProfiles.SetOrAdd(&Record{rec.Key[len(RATING_PROFILE_PREFIX):], rec.Object})
|
||||
s.save(RATING_PROFILES_FILE)
|
||||
fileToSave = RATING_PROFILES_FILE
|
||||
}
|
||||
|
||||
// flood protection for save method (do not save on every loop iteration)
|
||||
if s.waitingFile == fileToSave {
|
||||
s.loopChecker <- 1
|
||||
}
|
||||
s.waitingFile = fileToSave
|
||||
go func() {
|
||||
t := time.NewTicker(1 * time.Second)
|
||||
select {
|
||||
case <-s.loopChecker:
|
||||
// cancel saving
|
||||
case <-t.C:
|
||||
if fileToSave != "" {
|
||||
s.save(fileToSave)
|
||||
}
|
||||
t.Stop()
|
||||
s.waitingFile = ""
|
||||
}
|
||||
}()
|
||||
// no protection variant
|
||||
/*if fileToSave != "" {
|
||||
s.save(fileToSave)
|
||||
}*/
|
||||
*out = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ type MockScribe struct {
|
||||
RpBuf bytes.Buffer
|
||||
}
|
||||
|
||||
func NewMockScribe() (Scribe, error) {
|
||||
func NewMockScribe() (*MockScribe, error) {
|
||||
return &MockScribe{}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -30,10 +30,10 @@ const (
|
||||
)
|
||||
|
||||
type ProxyScribe struct {
|
||||
client *rpc.Client
|
||||
Client *rpc.Client
|
||||
}
|
||||
|
||||
func NewProxyScribe(addr, encoding string) (Scribe, error) {
|
||||
func NewProxyScribe(addr, encoding string) (*ProxyScribe, error) {
|
||||
var client *rpc.Client
|
||||
var err error
|
||||
switch encoding {
|
||||
@@ -48,9 +48,9 @@ func NewProxyScribe(addr, encoding string) (Scribe, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ProxyScribe{client: client}, nil
|
||||
return &ProxyScribe{Client: client}, nil
|
||||
}
|
||||
|
||||
func (ps *ProxyScribe) Record(rec *Record, out *int) error {
|
||||
return ps.client.Call("Scribe.Record", rec, out)
|
||||
return ps.Client.Call("Scribe.Record", rec, out)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user