Failover ees/loggers improved + tests

This commit is contained in:
adi
2022-07-11 17:59:20 +03:00
committed by Dan Christian Bogos
parent 81793af5de
commit bf870b1e2b
35 changed files with 1046 additions and 699 deletions

View File

@@ -24,25 +24,6 @@ import (
"time"
)
func TestCGREventHasField(t *testing.T) {
//empty check
cgrEvent := new(CGREvent)
rcv := cgrEvent.HasField("")
if rcv {
t.Error("Expecting: false, received: ", rcv)
}
//normal check
cgrEvent = &CGREvent{
Event: map[string]interface{}{
Usage: 20 * time.Second,
},
}
rcv = cgrEvent.HasField("Usage")
if !rcv {
t.Error("Expecting: true, received: ", rcv)
}
}
func TestCGREventCheckMandatoryFields(t *testing.T) {
//empty check
cgrEvent := new(CGREvent)

View File

@@ -312,6 +312,10 @@ const (
CreatedAt = "CreatedAt"
UpdatedAt = "UpdatedAt"
NodeID = "NodeID"
//ExportLogger
Message = "Message"
Severity = "Severity"
Timestamp = "Timestamp"
//cores consts
ActiveGoroutines = "ActiveGoroutines"
MemoryUsage = "MemoryUsage"
@@ -385,6 +389,10 @@ const (
UnitCounters = "UnitCounters"
UpdateTime = "UpdateTime"
Rates = "Rates"
Format = "Format"
Conn = "Conn"
Level = "Level"
FailedPostsDir = "FailedPostsDir"
//DestinationRates = "DestinationRates"
RatingPlans = "RatingPlans"
RatingProfiles = "RatingProfiles"
@@ -420,28 +428,29 @@ const (
ConnBlocker = "ConnBlocker"
ConnParameters = "ConnParameters"
Thresholds = "Thresholds"
Routes = "Routes"
Attributes = "Attributes"
Chargers = "Chargers"
Dispatchers = "Dispatchers"
StatS = "StatS"
LoadIDsVrs = "LoadIDs"
GlobalVarS = "GlobalVarS"
CostSource = "CostSource"
ExtraInfo = "ExtraInfo"
Meta = "*"
MetaSysLog = "*syslog"
MetaStdLog = "*stdout"
MetaKafka = "*kafka"
EventSource = "EventSource"
AccountID = "AccountID"
AccountIDs = "AccountIDs"
ResourceID = "ResourceID"
TotalUsage = "TotalUsage"
StatID = "StatID"
BalanceType = "BalanceType"
BalanceID = "BalanceID"
Thresholds = "Thresholds"
Routes = "Routes"
Attributes = "Attributes"
Chargers = "Chargers"
Dispatchers = "Dispatchers"
StatS = "StatS"
LoadIDsVrs = "LoadIDs"
GlobalVarS = "GlobalVarS"
CostSource = "CostSource"
ExtraInfo = "ExtraInfo"
Meta = "*"
MetaSysLog = "*syslog"
MetaStdLog = "*stdout"
MetaKafkaLog = "*kafkaLog"
Kafka = "Kafka"
EventSource = "EventSource"
AccountID = "AccountID"
AccountIDs = "AccountIDs"
ResourceID = "ResourceID"
TotalUsage = "TotalUsage"
StatID = "StatID"
BalanceType = "BalanceType"
BalanceID = "BalanceID"
//BalanceDestinationIds = "BalanceDestinationIds"
BalanceCostIncrements = "BalanceCostIncrements"
BalanceAttributeIDs = "BalanceAttributeIDs"
@@ -1125,6 +1134,7 @@ const (
// AdminSv1 APIs
const (
AdminSv1ReplayFailedPosts = "AdminSv1.ReplayFailedPosts"
AdminSv1GetRateRatesIndexesHealth = "AdminSv1.GetRateRatesIndexesHealth"
AdminSv1GetChargerProfilesCount = "AdminSv1.GetChargerProfilesCount"
AdminSv1GetAccountsIndexesHealth = "AdminSv1.GetAccountsIndexesHealth"

View File

@@ -987,17 +987,17 @@ func (pgnt Paginator) Clone() Paginator {
// GetPaginateOpts retrieves paginate options from the APIOpts map
func GetPaginateOpts(opts map[string]interface{}) (limit, offset, maxItems int, err error) {
if limitIface, has := opts[PageLimitOpt]; has {
if limit, err = IfaceAsTInt(limitIface); err != nil {
if limit, err = IfaceAsInt(limitIface); err != nil {
return
}
}
if offsetIface, has := opts[PageOffsetOpt]; has {
if offset, err = IfaceAsTInt(offsetIface); err != nil {
if offset, err = IfaceAsInt(offsetIface); err != nil {
return
}
}
if maxItemsIface, has := opts[PageMaxItemsOpt]; has {
if maxItems, err = IfaceAsTInt(maxItemsIface); err != nil {
if maxItems, err = IfaceAsInt(maxItemsIface); err != nil {
return
}
}

View File

@@ -19,52 +19,182 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package utils
import (
"bytes"
"encoding/gob"
"fmt"
"os"
"path/filepath"
"path"
"sync"
"time"
"github.com/cgrates/ltcache"
)
type FailoverPoster struct {
MessageProvider
var failedPostCache *ltcache.Cache
func init() {
failedPostCache = ltcache.NewCache(-1, 5*time.Second, true, writeFailedPosts)
}
func NewFailoverPoster( /*addMsg *MessageProvider*/ ) *FailoverPoster {
return new(FailoverPoster)
// SetFailedPostCacheTTL recreates the failed cache
func SetFailedPostCacheTTL(ttl time.Duration) {
failedPostCache = ltcache.NewCache(-1, ttl, true, writeFailedPosts)
}
func (fldPst *FailoverPoster) AddMessage(failedPostDir, kafkaConn,
kafkaTopic string, content interface{}) (err error) {
filePath := filepath.Join(failedPostDir, kafkaTopic+PipeSep+MetaKafka+GOBSuffix)
var fileOut *os.File
if _, err = os.Stat(filePath); os.IsNotExist(err) {
fileOut, err = os.Create(filePath)
if err != nil {
return fmt.Errorf(fmt.Sprintf("<Kafka> failed to write logs to file <%s> because <%s>", filePath, err))
}
} else {
fileOut, err = os.OpenFile(filePath, os.O_RDWR|os.O_APPEND, 0755)
if err != nil {
return err
}
func writeFailedPosts(_ string, value interface{}) {
expEv, canConvert := value.(*FailedExportersLogg)
if !canConvert {
return
}
enc := gob.NewEncoder(fileOut)
if err = enc.Encode(content); err != nil {
filePath := expEv.FilePath()
expEv.lk.RLock()
if err := expEv.WriteToFile(filePath); err != nil {
Logger.Warning(fmt.Sprintf("Unable to write failed post to file <%s> because <%s>",
filePath, err))
expEv.lk.RUnlock()
return
}
expEv.lk.RUnlock()
}
// FilePath returns the file path it should use for saving the failed events
func (expEv *FailedExportersLogg) FilePath() string {
return path.Join(expEv.FailedPostsDir, expEv.Module+PipeSep+UUIDSha1Prefix()+GOBSuffix)
}
// WriteToFile writes the events to file
func (expEv *FailedExportersLogg) WriteToFile(filePath string) (err error) {
fileOut, err := os.Create(filePath)
if err != nil {
return err
}
encd := gob.NewEncoder(fileOut)
gob.Register(new(CGREvent))
err = encd.Encode(expEv)
fileOut.Close()
return
}
type MessageProvider interface {
GetContent() string
GetMeta() string
type FailedExportersLogg struct {
lk sync.RWMutex
Path string
Opts map[string]interface{} // THIS WILL BE META
Format string
Events []interface{}
FailedPostsDir string
Module string
}
func (fldPst *FailoverPoster) GetContent() string {
return EmptyString
func AddFailedMessage(failedPostsDir, expPath, format,
module string, ev interface{}, opts map[string]interface{}) {
key := ConcatenatedKey(failedPostsDir, expPath, format, module)
switch module {
case EEs:
// also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id
var amqpQueueID string
var s3BucketID string
var sqsQueueID string
var kafkaTopic string
if _, has := opts[AMQPQueueID]; has {
amqpQueueID = IfaceAsString(opts[AMQPQueueID])
}
if _, has := opts[S3Bucket]; has {
s3BucketID = IfaceAsString(opts[S3Bucket])
}
if _, has := opts[SQSQueueID]; has {
sqsQueueID = IfaceAsString(opts[SQSQueueID])
}
if _, has := opts[kafkaTopic]; has {
kafkaTopic = IfaceAsString(opts[KafkaTopic])
}
if qID := FirstNonEmpty(amqpQueueID, s3BucketID,
sqsQueueID, kafkaTopic); len(qID) != 0 {
key = ConcatenatedKey(key, qID)
}
case Kafka:
}
var failedPost *FailedExportersLogg
if x, ok := failedPostCache.Get(key); ok {
if x != nil {
failedPost = x.(*FailedExportersLogg)
}
}
if failedPost == nil {
failedPost = &FailedExportersLogg{
Path: expPath,
Format: format,
Opts: opts,
Module: module,
FailedPostsDir: failedPostsDir,
}
failedPostCache.Set(key, failedPost, nil)
}
failedPost.AddEvent(ev)
}
func (fldPst *FailoverPoster) GetMeta() string {
return EmptyString
// AddEvent adds one event
func (expEv *FailedExportersLogg) AddEvent(ev interface{}) {
expEv.lk.Lock()
expEv.Events = append(expEv.Events, ev)
expEv.lk.Unlock()
}
// NewExportEventsFromFile returns ExportEvents from the file
// used only on replay failed post
func NewExportEventsFromFile(filePath string) (expEv *FailedExportersLogg, err error) {
var fileContent []byte
//err = guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) error {
if fileContent, err = os.ReadFile(filePath); err != nil {
return nil, err
}
if err = os.Remove(filePath); err != nil {
return nil, err
}
// }, config.CgrConfig().GeneralCfg().LockingTimeout, FileLockPrefix+filePath)
dec := gob.NewDecoder(bytes.NewBuffer(fileContent))
// unmarshall it
expEv = new(FailedExportersLogg)
err = dec.Decode(&expEv)
return
}
type FailoverPoster interface {
ReplayFailedPosts(int) (*FailedExportersLogg, error)
}
// ReplayFailedPosts tryies to post cdrs again
func (expEv *FailedExportersLogg) ReplayFailedPosts(attempts int) (failedEvents *FailedExportersLogg, err error) {
/* failedEvents = &ExportEvents{
Path: expEv.Path,
Opts: expEv.Opts,
Format: expEv.Format,
}
var ee EventExporter
if ee, err = NewEventExporter(&config.EventExporterCfg{
ID: "ReplayFailedPosts",
Type: expEv.Format,
ExportPath: expEv.Path,
Opts: expEv.Opts,
Attempts: attempts,
FailedPostsDir: MetaNone,
}, config.CgrConfig(), nil, nil); err != nil {
return
}
keyFunc := func() string { return EmptyString }
if expEv.Format == MetaKafkajsonMap || expEv.Format == MetaS3jsonMap {
keyFunc = UUIDSha1Prefix
}
for _, ev := range expEv.Events {
if err = ExportWithAttempts(context.Background(), ee, ev, keyFunc()); err != nil {
failedEvents.AddEvent(ev)
}
}
ee.Close()
if len(failedEvents.Events) > 0 {
err = ErrPartiallyExecuted
} else {
failedEvents = nil
} */
return nil, nil
}

101
utils/failover_export1.go Normal file
View File

@@ -0,0 +1,101 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package utils
/*
import (
"encoding/gob"
"fmt"
"os"
"path/filepath"
"sync"
)
type FailoverPoster struct {
sync.Mutex
mp MessageProvider // e.g kafka
}
func NewFailoverPoster(dataType, tenant, nodeID, conn, format, fldPostDir string,
logLvl, attempts int) *FailoverPoster {
fldPst := new(FailoverPoster)
switch dataType {
case MetaKafkaLog:
fldPst.mp = NewExportLogger(nodeID, tenant, logLvl, conn, format, attempts, fldPostDir)
}
return fldPst
}
func (fldPst *FailoverPoster) AddFailedMessage(content interface{}) (err error) {
fldPst.Lock()
meta := fldPst.mp.GetMeta()
filePath := filepath.Join(meta[FailedPostsDir].(string), meta[Format].(string)+
PipeSep+MetaKafkaLog+GOBSuffix)
var fileOut *os.File
if _, err = os.Stat(filePath); os.IsNotExist(err) {
fileOut, err = os.Create(filePath)
if err != nil {
return fmt.Errorf(fmt.Sprintf("<Kafka> failed to write logs to file <%s> because <%s>", filePath, err))
}
} else {
fileOut, err = os.OpenFile(filePath, os.O_RDWR|os.O_APPEND, 0755)
if err != nil {
return err
}
}
failPoster := &FailoverPosterData{
MetaData: meta,
Content: content.(*CGREvent),
}
enc := gob.NewEncoder(fileOut)
err = enc.Encode(failPoster)
fileOut.Close()
fldPst.Unlock()
return
}
type MessageProvider interface {
GetContent(filePath string) (string, error)
GetMeta() map[string]interface{}
}
func NewMessageProvider(dataType string) (MessageProvider, error) {
switch dataType {
case MetaKafkaLog:
return new(ExportLogger), nil
default:
return nil, fmt.Errorf("Invalid Message Provider type in order to read the failed posts")
}
}
func (fldPst *FailoverPoster) GetContent(filePath string) (string, error) {
}
func (fldPst *FailoverPoster) GetMeta() string {
return EmptyString
}
// FailoverPosterData will keep the data and the content of the failed post. It is used when we read from gob file to know these info
type FailoverPosterData struct {
MetaData map[string]interface{}
Content *CGREvent
} */

234
utils/kafka_logger.go Normal file
View File

@@ -0,0 +1,234 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://.gnu.org/licenses/>
*/
package utils
import (
"log/syslog"
"sync"
"time"
"github.com/cgrates/birpc/context"
"github.com/segmentio/kafka-go"
)
// Logs to kafka
type ExportLogger struct {
sync.Mutex
logLevel int
fldPostDir string
writer *kafka.Writer
nodeID string
tenant string
}
// NewExportLogger will export loggers to kafka
func NewExportLogger(nodeID, tenant string, level int,
connOpts, connTopic string, attempts int, fldPostDir string) (el *ExportLogger) {
el = &ExportLogger{
logLevel: level,
fldPostDir: fldPostDir,
nodeID: nodeID,
tenant: tenant,
writer: &kafka.Writer{
Addr: kafka.TCP(connOpts),
Topic: connTopic,
MaxAttempts: attempts,
},
}
return
}
func (el *ExportLogger) Close() (err error) {
if el.writer != nil {
err = el.writer.Close()
el.writer = nil
}
return
}
func (el *ExportLogger) call(m string, level int) (err error) {
eventExport := &CGREvent{
Tenant: el.tenant,
Event: map[string]interface{}{
NodeID: el.nodeID,
Message: m,
Severity: level,
Timestamp: time.Now().Format("2006-01-02 15:04:05"),
},
}
// event will be exported through kafka as json format
var content []byte
if content, err = ToUnescapedJSON(eventExport); err != nil {
return
}
if err = el.writer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(GenUUID()),
Value: content,
}); err != nil {
// if there are any errors in kafka, we will post in FailedPostDirectory
AddFailedMessage(el.fldPostDir, el.writer.Addr.String(), MetaKafkaLog, Kafka,
eventExport, el.GetMeta())
// also the content should be printed as a stdout logger type
return ErrLoggerChanged
}
return
}
func (el *ExportLogger) Write(p []byte) (n int, err error) {
n = len(p)
err = el.call(string(p), 8)
return
}
func (sl *ExportLogger) GetSyslog() *syslog.Writer {
return nil
}
// GetLogLevel() returns the level logger number for the server
func (el *ExportLogger) GetLogLevel() int {
return el.logLevel
}
// SetLogLevel changes the log level
func (el *ExportLogger) SetLogLevel(level int) {
el.logLevel = level
}
// Alert logs to EEs with alert level
func (el *ExportLogger) Alert(m string) (err error) {
if el.logLevel < LOGLEVEL_ALERT {
return nil
}
if err = el.call(m, LOGLEVEL_ALERT); err != nil {
if err == ErrLoggerChanged {
NewStdLogger(el.nodeID, el.logLevel).Alert(m)
err = nil
}
}
return
}
// Crit logs to EEs with critical level
func (el *ExportLogger) Crit(m string) (err error) {
if el.logLevel < LOGLEVEL_CRITICAL {
return nil
}
if el.call(m, LOGLEVEL_CRITICAL); err != nil {
if err == ErrLoggerChanged {
NewStdLogger(el.nodeID, el.logLevel).Crit(m)
err = nil
}
}
return
}
// Debug logs to EEs with debug level
func (el *ExportLogger) Debug(m string) (err error) {
if el.logLevel < LOGLEVEL_DEBUG {
return nil
}
if err = el.call(m, LOGLEVEL_DEBUG); err != nil {
if err == ErrLoggerChanged {
NewStdLogger(el.nodeID, el.logLevel).Debug(m)
err = nil
}
}
return
}
// Emerg logs to EEs with emergency level
func (el *ExportLogger) Emerg(m string) (err error) {
if el.logLevel < LOGLEVEL_EMERGENCY {
return nil
}
if err = el.call(m, LOGLEVEL_EMERGENCY); err != nil {
if err == ErrLoggerChanged {
NewStdLogger(el.nodeID, el.logLevel).Emerg(m)
err = nil
}
}
return
}
// Err logs to EEs with error level
func (el *ExportLogger) Err(m string) (err error) {
if el.logLevel < LOGLEVEL_ERROR {
return nil
}
if err = el.call(m, LOGLEVEL_ERROR); err != nil {
if err == ErrLoggerChanged {
NewStdLogger(el.nodeID, el.logLevel).Err(m)
err = nil
}
}
return
}
// Info logs to EEs with info level
func (el *ExportLogger) Info(m string) (err error) {
if el.logLevel < LOGLEVEL_INFO {
return nil
}
if err = el.call(m, LOGLEVEL_INFO); err != nil {
if err == ErrLoggerChanged {
NewStdLogger(el.nodeID, el.logLevel).Info(m)
err = nil
}
}
return
}
// Notice logs to EEs with notice level
func (el *ExportLogger) Notice(m string) (err error) {
if el.logLevel < LOGLEVEL_NOTICE {
return nil
}
if err = el.call(m, LOGLEVEL_NOTICE); err != nil {
if err == ErrLoggerChanged {
NewStdLogger(el.nodeID, el.logLevel).Notice(m)
err = nil
}
}
return
}
// Warning logs to EEs with warning level
func (el *ExportLogger) Warning(m string) (err error) {
if el.logLevel < LOGLEVEL_WARNING {
return nil
}
if err = el.call(m, LOGLEVEL_WARNING); err != nil {
if err == ErrLoggerChanged {
NewStdLogger(el.nodeID, el.logLevel).Warning(m)
err = nil
}
}
return
}
func (el *ExportLogger) GetMeta() map[string]interface{} {
return map[string]interface{}{
Level: el.logLevel,
Format: el.writer.Topic,
Conn: el.writer.Addr.String(),
FailedPostsDir: el.fldPostDir,
Attempts: el.writer.MaxAttempts,
}
}

507
utils/kafka_logger_test.go Normal file
View File

@@ -0,0 +1,507 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package utils
/*
func TestLoggerNewLoggerExport(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
exp := &ExportLogger{
logLevel: 6,
nodeID: "123",
tenant: "cgrates.org",
loggOpts: cfg.LoggerCfg().Opts,
writer: &kafka.Writer{
Addr: kafka.TCP(cfg.LoggerCfg().Opts.KafkaConn),
Topic: cfg.LoggerCfg().Opts.KafkaTopic,
MaxAttempts: cfg.LoggerCfg().Opts.Attempts,
},
}
if rcv, err := NewLogger(utils.MetaKafka, "cgrates.org", "123", cfg.LoggerCfg()); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(rcv.(*ExportLogger), exp) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv)
}
}
func TestLoggerNewLoggerDefault(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
experr := `unsupported logger: <invalid>`
if _, err := NewLogger("invalid", "cgrates.org", "123", cfg.LoggerCfg()); err == nil ||
err.Error() != experr {
t.Errorf("expected: <%s>, \nreceived: <%s>", experr, err)
}
}
func TestLoggerNewExportLogger(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
exp := &ExportLogger{
logLevel: 7,
nodeID: "123",
tenant: "cgrates.org",
loggOpts: cfg.LoggerCfg().Opts,
writer: &kafka.Writer{
Addr: kafka.TCP(cfg.LoggerCfg().Opts.KafkaConn),
Topic: cfg.LoggerCfg().Opts.KafkaTopic,
MaxAttempts: cfg.LoggerCfg().Opts.Attempts,
},
}
if rcv := NewExportLogger("123", "cgrates.org", 7, cfg.LoggerCfg().Opts); !reflect.DeepEqual(rcv, exp) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv)
}
}
func TestLoggerExportEmerg(t *testing.T) {
tmp := Cache
defer func() {
Cache = tmp
}()
eesConn := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs)
cfg := config.NewDefaultCGRConfig()
cfg.CoreSCfg().EEsConns = []string{eesConn}
Cache = NewCacheS(cfg, nil, nil)
cM := NewConnManager(cfg)
ccM := &ccMock{
calls: map[string]func(ctx *context.Context, args interface{}, reply interface{}) error{
utils.EeSv1ProcessEvent: func(ctx *context.Context, args, reply interface{}) error {
delete(args.(*utils.CGREventWithEeIDs).Event, "Timestamp")
exp := &utils.CGREventWithEeIDs{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.NodeID: "123",
"Message": "Emergency message",
"Severity": utils.LOGLEVEL_EMERGENCY,
},
},
}
if !reflect.DeepEqual(exp, args) {
return fmt.Errorf("\nexpected: <%+v>, \nreceived: <%+v>",
utils.ToJSON(exp), utils.ToJSON(args))
}
return nil
},
},
}
rpcInternal := make(chan birpc.ClientConnector, 1)
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", -1, cfg.LoggerCfg().Opts)
if err := el.Emerg("Emergency message"); err != nil {
t.Error(err)
}
el.SetLogLevel(0)
if err := el.Emerg("Emergency message"); err != nil {
t.Error(err)
}
}
func TestLoggerExportAlert(t *testing.T) {
tmp := Cache
defer func() {
Cache = tmp
}()
eesConn := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs)
cfg := config.NewDefaultCGRConfig()
cfg.CoreSCfg().EEsConns = []string{eesConn}
Cache = NewCacheS(cfg, nil, nil)
cM := NewConnManager(cfg)
ccM := &ccMock{
calls: map[string]func(ctx *context.Context, args interface{}, reply interface{}) error{
utils.EeSv1ProcessEvent: func(ctx *context.Context, args, reply interface{}) error {
delete(args.(*utils.CGREventWithEeIDs).Event, "Timestamp")
exp := &utils.CGREventWithEeIDs{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.NodeID: "123",
"Message": "Alert message",
"Severity": utils.LOGLEVEL_ALERT,
},
},
}
if !reflect.DeepEqual(exp, args) {
return fmt.Errorf("\nexpected: <%+v>, \nreceived: <%+v>",
utils.ToJSON(exp), utils.ToJSON(args))
}
return nil
},
},
}
rpcInternal := make(chan birpc.ClientConnector, 1)
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", 0, cfg.LoggerCfg().Opts)
if err := el.Alert("Alert message"); err != nil {
t.Error(err)
}
el.SetLogLevel(1)
if err := el.Alert("Alert message"); err != nil {
t.Error(err)
}
}
func TestLoggerExportCrit(t *testing.T) {
tmp := Cache
defer func() {
Cache = tmp
}()
eesConn := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs)
cfg := config.NewDefaultCGRConfig()
cfg.CoreSCfg().EEsConns = []string{eesConn}
Cache = NewCacheS(cfg, nil, nil)
cM := NewConnManager(cfg)
ccM := &ccMock{
calls: map[string]func(ctx *context.Context, args interface{}, reply interface{}) error{
utils.EeSv1ProcessEvent: func(ctx *context.Context, args, reply interface{}) error {
delete(args.(*utils.CGREventWithEeIDs).Event, "Timestamp")
exp := &utils.CGREventWithEeIDs{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.NodeID: "123",
"Message": "Critical message",
"Severity": utils.LOGLEVEL_CRITICAL,
},
},
}
if !reflect.DeepEqual(exp, args) {
return fmt.Errorf("\nexpected: <%+v>, \nreceived: <%+v>",
utils.ToJSON(exp), utils.ToJSON(args))
}
return nil
},
},
}
rpcInternal := make(chan birpc.ClientConnector, 1)
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", 1, cfg.LoggerCfg().Opts)
if err := el.Crit("Critical message"); err != nil {
t.Error(err)
}
el.SetLogLevel(2)
if err := el.Crit("Critical message"); err != nil {
t.Error(err)
}
}
func TestLoggerExportErr(t *testing.T) {
tmp := Cache
defer func() {
Cache = tmp
}()
eesConn := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs)
cfg := config.NewDefaultCGRConfig()
cfg.CoreSCfg().EEsConns = []string{eesConn}
Cache = NewCacheS(cfg, nil, nil)
cM := NewConnManager(cfg)
ccM := &ccMock{
calls: map[string]func(ctx *context.Context, args interface{}, reply interface{}) error{
utils.EeSv1ProcessEvent: func(ctx *context.Context, args, reply interface{}) error {
delete(args.(*utils.CGREventWithEeIDs).Event, "Timestamp")
exp := &utils.CGREventWithEeIDs{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.NodeID: "123",
"Message": "Error message",
"Severity": utils.LOGLEVEL_ERROR,
},
},
}
if !reflect.DeepEqual(exp, args) {
return fmt.Errorf("\nexpected: <%+v>, \nreceived: <%+v>",
utils.ToJSON(exp), utils.ToJSON(args))
}
return nil
},
},
}
rpcInternal := make(chan birpc.ClientConnector, 1)
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", 2, cfg.LoggerCfg().Opts)
if err := el.Err("Error message"); err != nil {
t.Error(err)
}
el.SetLogLevel(3)
if err := el.Err("Error message"); err != nil {
t.Error(err)
}
}
func TestLoggerExportWarning(t *testing.T) {
tmp := Cache
defer func() {
Cache = tmp
}()
eesConn := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs)
cfg := config.NewDefaultCGRConfig()
cfg.CoreSCfg().EEsConns = []string{eesConn}
Cache = NewCacheS(cfg, nil, nil)
cM := NewConnManager(cfg)
ccM := &ccMock{
calls: map[string]func(ctx *context.Context, args interface{}, reply interface{}) error{
utils.EeSv1ProcessEvent: func(ctx *context.Context, args, reply interface{}) error {
delete(args.(*utils.CGREventWithEeIDs).Event, "Timestamp")
exp := &utils.CGREventWithEeIDs{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.NodeID: "123",
"Message": "Warning message",
"Severity": utils.LOGLEVEL_WARNING,
},
},
}
if !reflect.DeepEqual(exp, args) {
return fmt.Errorf("\nexpected: <%+v>, \nreceived: <%+v>",
utils.ToJSON(exp), utils.ToJSON(args))
}
return nil
},
},
}
rpcInternal := make(chan birpc.ClientConnector, 1)
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", 3, cfg.LoggerCfg().Opts)
if err := el.Warning("Warning message"); err != nil {
t.Error(err)
}
el.SetLogLevel(4)
if err := el.Warning("Warning message"); err != nil {
t.Error(err)
}
}
func TestLoggerExportNotice(t *testing.T) {
tmp := Cache
defer func() {
Cache = tmp
}()
eesConn := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs)
cfg := config.NewDefaultCGRConfig()
cfg.CoreSCfg().EEsConns = []string{eesConn}
Cache = NewCacheS(cfg, nil, nil)
cM := NewConnManager(cfg)
ccM := &ccMock{
calls: map[string]func(ctx *context.Context, args interface{}, reply interface{}) error{
utils.EeSv1ProcessEvent: func(ctx *context.Context, args, reply interface{}) error {
delete(args.(*utils.CGREventWithEeIDs).Event, "Timestamp")
exp := &utils.CGREventWithEeIDs{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.NodeID: "123",
"Message": "Notice message",
"Severity": utils.LOGLEVEL_NOTICE,
},
},
}
if !reflect.DeepEqual(exp, args) {
return fmt.Errorf("\nexpected: <%+v>, \nreceived: <%+v>",
utils.ToJSON(exp), utils.ToJSON(args))
}
return nil
},
},
}
rpcInternal := make(chan birpc.ClientConnector, 1)
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", 4, cfg.LoggerCfg().Opts)
if err := el.Notice("Notice message"); err != nil {
t.Error(err)
}
el.SetLogLevel(5)
if err := el.Notice("Notice message"); err != nil {
t.Error(err)
}
}
func TestLoggerExportInfo(t *testing.T) {
tmp := Cache
defer func() {
Cache = tmp
}()
eesConn := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs)
cfg := config.NewDefaultCGRConfig()
cfg.CoreSCfg().EEsConns = []string{eesConn}
Cache = NewCacheS(cfg, nil, nil)
cM := NewConnManager(cfg)
ccM := &ccMock{
calls: map[string]func(ctx *context.Context, args interface{}, reply interface{}) error{
utils.EeSv1ProcessEvent: func(ctx *context.Context, args, reply interface{}) error {
delete(args.(*utils.CGREventWithEeIDs).Event, "Timestamp")
exp := &utils.CGREventWithEeIDs{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.NodeID: "123",
"Message": "Info message",
"Severity": utils.LOGLEVEL_INFO,
},
},
}
if !reflect.DeepEqual(exp, args) {
return fmt.Errorf("\nexpected: <%+v>, \nreceived: <%+v>",
utils.ToJSON(exp), utils.ToJSON(args))
}
return nil
},
},
}
rpcInternal := make(chan birpc.ClientConnector, 1)
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", 5, cfg.LoggerCfg().Opts)
if err := el.Info("Info message"); err != nil {
t.Error(err)
}
el.SetLogLevel(6)
if err := el.Info("Info message"); err != nil {
t.Error(err)
}
}
func TestLoggerExportDebug(t *testing.T) {
tmp := Cache
defer func() {
Cache = tmp
}()
eesConn := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs)
cfg := config.NewDefaultCGRConfig()
cfg.CoreSCfg().EEsConns = []string{eesConn}
Cache = NewCacheS(cfg, nil, nil)
cM := NewConnManager(cfg)
ccM := &ccMock{
calls: map[string]func(ctx *context.Context, args interface{}, reply interface{}) error{
utils.EeSv1ProcessEvent: func(ctx *context.Context, args, reply interface{}) error {
delete(args.(*utils.CGREventWithEeIDs).Event, "Timestamp")
exp := &utils.CGREventWithEeIDs{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.NodeID: "123",
"Message": "Debug message",
"Severity": utils.LOGLEVEL_DEBUG,
},
},
}
if !reflect.DeepEqual(exp, args) {
return fmt.Errorf("\nexpected: <%+v>, \nreceived: <%+v>",
utils.ToJSON(exp), utils.ToJSON(args))
}
return nil
},
},
}
rpcInternal := make(chan birpc.ClientConnector, 1)
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", 6, cfg.LoggerCfg().Opts)
if err := el.Debug("Debug message"); err != nil {
t.Error(err)
}
el.SetLogLevel(7)
if err := el.Debug("Debug message"); err != nil {
t.Error(err)
}
}
func TestLoggerSetGetLogLevel(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
el := NewExportLogger("123", "cgrates.org", 6, cfg.LoggerCfg().Opts)
if rcv := el.GetLogLevel(); rcv != 6 {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", 6, rcv)
}
el.SetLogLevel(3)
if rcv := el.GetLogLevel(); rcv != 3 {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", 3, rcv)
}
}
func TestLoggerGetSyslog(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
el := NewExportLogger("123", "cgrates.org", 6, cfg.LoggerCfg().Opts)
if el.GetSyslog() != nil {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", nil, el.GetSyslog())
}
}
func TestLoggerExportWrite(t *testing.T) {
tmp := Cache
defer func() {
Cache = tmp
}()
eesConn := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs)
cfg := config.NewDefaultCGRConfig()
cfg.CoreSCfg().EEsConns = []string{eesConn}
Cache = NewCacheS(cfg, nil, nil)
cM := NewConnManager(cfg)
ccM := &ccMock{
calls: map[string]func(ctx *context.Context, args interface{}, reply interface{}) error{
utils.EeSv1ProcessEvent: func(ctx *context.Context, args, reply interface{}) error {
delete(args.(*utils.CGREventWithEeIDs).Event, "Timestamp")
exp := &utils.CGREventWithEeIDs{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.NodeID: "123",
"Message": "message",
"Severity": 8,
},
},
}
if !reflect.DeepEqual(exp, args) {
return fmt.Errorf("\nexpected: <%+v>, \nreceived: <%+v>",
utils.ToJSON(exp), utils.ToJSON(args))
}
return nil
},
},
}
rpcInternal := make(chan birpc.ClientConnector, 1)
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", 8, cfg.LoggerCfg().Opts)
if _, err := el.Write([]byte("message")); err != nil {
t.Error(err)
}
el.Close()
} */

View File

@@ -29,7 +29,8 @@ import (
var Logger LoggerInterface
func init() {
Logger, _ = NewLogger(MetaStdLog, EmptyString, 0)
Logger, _ = NewLogger(MetaStdLog, EmptyString, EmptyString, 0,
0, EmptyString, EmptyString, EmptyString)
}
// log severities following rfc3164
@@ -60,12 +61,15 @@ type LoggerInterface interface {
Write(p []byte) (n int, err error)
}
func NewLogger(loggerType, nodeID string, level int) (l LoggerInterface, err error) {
func NewLogger(loggerType, tenant, nodeID string, logLvl, attempts int, connOpts,
topicOpts, fldPostsDir string) (LoggerInterface, error) {
switch loggerType {
case MetaKafkaLog:
return NewExportLogger(nodeID, tenant, logLvl, connOpts, topicOpts, attempts, fldPostsDir), nil
case MetaStdLog:
return NewStdLogger(nodeID, level), nil
return NewStdLogger(nodeID, logLvl), nil
case MetaSysLog:
return NewSysLogger(nodeID, level)
return NewSysLogger(nodeID, logLvl)
default:
return nil, fmt.Errorf("unsupported logger: <%+s>", loggerType)
}

View File

@@ -35,7 +35,7 @@ func TestLoggerNewLoggerStdoutOK(t *testing.T) {
log.New(os.Stderr, EmptyString, log.LstdFlags),
},
}
if rcv, err := NewLogger(MetaStdLog, "1234", 7); err != nil {
if rcv, err := NewLogger(MetaStdLog, EmptyString, "1234", 7, 0, EmptyString, EmptyString, EmptyString); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(rcv, exp) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv)
@@ -46,7 +46,7 @@ func TestLoggerNewLoggerSyslogOK(t *testing.T) {
exp := &SysLogger{
logLevel: 7,
}
if rcv, err := NewLogger(MetaSysLog, "1234", 7); err != nil {
if rcv, err := NewLogger(MetaSysLog, EmptyString, "1234", 7, 0, EmptyString, EmptyString, EmptyString); err != nil {
t.Error(err)
} else {
exp.syslog = rcv.GetSyslog()
@@ -58,7 +58,7 @@ func TestLoggerNewLoggerSyslogOK(t *testing.T) {
func TestLoggerNewLoggerUnsupported(t *testing.T) {
experr := `unsupported logger: <unsupported>`
if _, err := NewLogger("unsupported", "1234", 7); err == nil || err.Error() != experr {
if _, err := NewLogger("unsupported", EmptyString, "1234", 7, 0, EmptyString, EmptyString, EmptyString); err == nil || err.Error() != experr {
t.Errorf("expected: <%s>, \nreceived: <%+v>", experr, err)
}
}

View File

@@ -194,7 +194,7 @@ func IfaceAsInt64(itm interface{}) (i int64, err error) {
}
// IfaceAsTInt converts interface to type int
func IfaceAsTInt(itm interface{}) (i int, err error) {
func IfaceAsInt(itm interface{}) (i int, err error) {
switch it := itm.(type) {
case int:
return it, nil

View File

@@ -851,11 +851,11 @@ func TestIfaceAsTInt64Default(t *testing.T) {
}
func TestIfaceAsTIntDefault(t *testing.T) {
if _, err := IfaceAsTInt(true); err == nil || err.Error() != "cannot convert field<bool>: true to int" {
if _, err := IfaceAsInt(true); err == nil || err.Error() != "cannot convert field<bool>: true to int" {
t.Errorf("Expecting <cannot convert field<bool>: true to int> ,received: <%+v>", err)
}
var test time.Duration = 2147483647
response, _ := IfaceAsTInt(test)
response, _ := IfaceAsInt(test)
if !reflect.DeepEqual(response, int(test.Nanoseconds())) {
t.Errorf("Expected <%+v> ,received: <%+v>", test.Nanoseconds(), response)
}