Replayed failed loggs improvements

This commit is contained in:
adi
2022-07-12 16:38:12 +03:00
committed by Dan Christian Bogos
parent bf870b1e2b
commit 2d5d01a664
6 changed files with 42 additions and 145 deletions

View File

@@ -751,6 +751,9 @@ func (optsEes *EventExporterOpts) AsMapInterface() map[string]interface{} {
if optsEes.SQLMaxOpenConns != nil {
opts[utils.SQLMaxOpenConns] = *optsEes.SQLMaxOpenConns
}
if optsEes.MYSQLDSNParams != nil {
opts[utils.MYSQLDSNParams] = optsEes.MYSQLDSNParams
}
if optsEes.SQLConnMaxLifetime != nil {
opts[utils.SQLConnMaxLifetime] = optsEes.SQLConnMaxLifetime.String()
}

View File

@@ -4,20 +4,22 @@
"reply_timeout": "50s"
},
// "logger": {
// "type": "*kafkaLog", // controls the destination of logs <*syslog|*stdout|*kafka>
// "level": 6, // system level precision for floats
// "opts": {
// "kafka_conn": "localhost:9092", // the connection trough kafka
// "kafka_topic": "TutorialTopic", // the topic from where the events are exported
// "attempts": 1, // number of attempts of connecting
// },
//},
/*
"logger": {
"type": "*kafkaLog", // controls the destination of logs <*syslog|*stdout|*kafka>
"level": 6, // system level precision for floats
"opts": {
"kafka_conn": "ldasdas:9092", // the connection trough kafka
"kafka_topic": "TutorialTopic", // the topic from where the events are exported
"attempts": 1, // number of attempts of connecting
},
},
*/
"logger": {
"type": "*syslog", // controls the destination of logs <*syslog|*stdout|*kafka>
"level": 6, // system level precision for floats
},
},
"listen": {
"rpc_json": ":2012",

View File

@@ -286,7 +286,7 @@ func (expEv *FailedExportersEEs) AddEvent(ev interface{}) {
}
// ReplayFailedPosts tryies to post cdrs again
func (expEv *FailedExportersEEs) ReplayFailedPosts(attempts int) (failedEvents *utils.FailedExportersLogg, err error) {
func (expEv *FailedExportersEEs) ReplayFailedPosts(attempts int) (err error) {
eesFailedEvents := &FailedExportersEEs{
Path: expEv.Path,
Opts: expEv.Opts,

View File

@@ -27,7 +27,9 @@ import (
"sync"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/ltcache"
"github.com/segmentio/kafka-go"
)
var failedPostCache *ltcache.Cache
@@ -143,14 +145,12 @@ func (expEv *FailedExportersLogg) AddEvent(ev interface{}) {
// used only on replay failed post
func NewExportEventsFromFile(filePath string) (expEv *FailedExportersLogg, err error) {
var fileContent []byte
//err = guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) error {
if fileContent, err = os.ReadFile(filePath); err != nil {
return nil, err
}
if err = os.Remove(filePath); err != nil {
return nil, err
}
// }, config.CgrConfig().GeneralCfg().LockingTimeout, FileLockPrefix+filePath)
dec := gob.NewDecoder(bytes.NewBuffer(fileContent))
// unmarshall it
expEv = new(FailedExportersLogg)
@@ -159,42 +159,33 @@ func NewExportEventsFromFile(filePath string) (expEv *FailedExportersLogg, err e
}
type FailoverPoster interface {
ReplayFailedPosts(int) (*FailedExportersLogg, error)
ReplayFailedPosts(int) error
}
// ReplayFailedPosts tryies to post cdrs again
func (expEv *FailedExportersLogg) ReplayFailedPosts(attempts int) (failedEvents *FailedExportersLogg, err error) {
/* failedEvents = &ExportEvents{
Path: expEv.Path,
Opts: expEv.Opts,
Format: expEv.Format,
}
var ee EventExporter
if ee, err = NewEventExporter(&config.EventExporterCfg{
ID: "ReplayFailedPosts",
Type: expEv.Format,
ExportPath: expEv.Path,
Opts: expEv.Opts,
Attempts: attempts,
FailedPostsDir: MetaNone,
}, config.CgrConfig(), nil, nil); err != nil {
func (expEv *FailedExportersLogg) ReplayFailedPosts(attempts int) (err error) {
nodeID := IfaceAsString(expEv.Opts[NodeID])
tnt := IfaceAsString(expEv.Opts[Tenant])
logLvl, err := IfaceAsInt(expEv.Opts[Level])
if err != nil {
return
}
keyFunc := func() string { return EmptyString }
if expEv.Format == MetaKafkajsonMap || expEv.Format == MetaS3jsonMap {
keyFunc = UUIDSha1Prefix
}
for _, ev := range expEv.Events {
if err = ExportWithAttempts(context.Background(), ee, ev, keyFunc()); err != nil {
failedEvents.AddEvent(ev)
expLogger := NewExportLogger(nodeID, tnt, logLvl,
expEv.Path, expEv.Format, attempts, expEv.FailedPostsDir)
for _, event := range expEv.Events {
var content []byte
if content, err = ToUnescapedJSON(event); err != nil {
return
}
if err = expLogger.writer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(GenUUID()),
Value: content,
}); err != nil {
// if there are any errors in kafka, we will post in FailedPostDirectory
AddFailedMessage(expLogger.fldPostDir, expLogger.writer.Addr.String(), MetaKafkaLog, Kafka,
event, expLogger.GetMeta())
return nil
}
}
ee.Close()
if len(failedEvents.Events) > 0 {
err = ErrPartiallyExecuted
} else {
failedEvents = nil
} */
return nil, nil
return err
}

View File

@@ -1,101 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package utils
/*
import (
"encoding/gob"
"fmt"
"os"
"path/filepath"
"sync"
)
type FailoverPoster struct {
sync.Mutex
mp MessageProvider // e.g kafka
}
func NewFailoverPoster(dataType, tenant, nodeID, conn, format, fldPostDir string,
logLvl, attempts int) *FailoverPoster {
fldPst := new(FailoverPoster)
switch dataType {
case MetaKafkaLog:
fldPst.mp = NewExportLogger(nodeID, tenant, logLvl, conn, format, attempts, fldPostDir)
}
return fldPst
}
func (fldPst *FailoverPoster) AddFailedMessage(content interface{}) (err error) {
fldPst.Lock()
meta := fldPst.mp.GetMeta()
filePath := filepath.Join(meta[FailedPostsDir].(string), meta[Format].(string)+
PipeSep+MetaKafkaLog+GOBSuffix)
var fileOut *os.File
if _, err = os.Stat(filePath); os.IsNotExist(err) {
fileOut, err = os.Create(filePath)
if err != nil {
return fmt.Errorf(fmt.Sprintf("<Kafka> failed to write logs to file <%s> because <%s>", filePath, err))
}
} else {
fileOut, err = os.OpenFile(filePath, os.O_RDWR|os.O_APPEND, 0755)
if err != nil {
return err
}
}
failPoster := &FailoverPosterData{
MetaData: meta,
Content: content.(*CGREvent),
}
enc := gob.NewEncoder(fileOut)
err = enc.Encode(failPoster)
fileOut.Close()
fldPst.Unlock()
return
}
type MessageProvider interface {
GetContent(filePath string) (string, error)
GetMeta() map[string]interface{}
}
func NewMessageProvider(dataType string) (MessageProvider, error) {
switch dataType {
case MetaKafkaLog:
return new(ExportLogger), nil
default:
return nil, fmt.Errorf("Invalid Message Provider type in order to read the failed posts")
}
}
func (fldPst *FailoverPoster) GetContent(filePath string) (string, error) {
}
func (fldPst *FailoverPoster) GetMeta() string {
return EmptyString
}
// FailoverPosterData will keep the data and the content of the failed post. It is used when we read from gob file to know these info
type FailoverPosterData struct {
MetaData map[string]interface{}
Content *CGREvent
} */

View File

@@ -225,6 +225,8 @@ func (el *ExportLogger) Warning(m string) (err error) {
func (el *ExportLogger) GetMeta() map[string]interface{} {
return map[string]interface{}{
Tenant: el.tenant,
NodeID: el.nodeID,
Level: el.logLevel,
Format: el.writer.Topic,
Conn: el.writer.Addr.String(),