EFs service to logger/ees and APIS

This commit is contained in:
adi
2022-07-22 17:04:21 +03:00
committed by Dan Christian Bogos
parent 86d2f1476a
commit da358161a7
15 changed files with 702 additions and 115 deletions

View File

@@ -78,8 +78,10 @@ func (aL *actHTTPPost) execute(ctx *context.Context, data utils.MapStorage, _ st
var partExec bool
for _, pstr := range aL.pstrs {
if async, has := aL.cfg().Opts[utils.MetaAsync]; has && utils.IfaceAsString(async) == utils.TrueStr {
go ees.ExportWithAttempts(context.Background(), pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString)
} else if err = ees.ExportWithAttempts(ctx, pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString); err != nil {
go ees.ExportWithAttempts(context.Background(), pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString,
nil, aL.config.GeneralCfg().DefaultTenant)
} else if err = ees.ExportWithAttempts(ctx, pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString,
nil, aL.config.GeneralCfg().DefaultTenant); err != nil {
if pstr.Cfg().FailedPostsDir != utils.MetaNone {
err = nil
} else {

View File

@@ -25,7 +25,6 @@ import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/ees"
"github.com/cgrates/cgrates/efs"
"github.com/cgrates/cgrates/utils"
)
@@ -44,18 +43,13 @@ type EfSv1 struct {
ping
}
type ArgsReplayFailedPosts struct {
TypeProvider string
FailedRequestsInDir *string // if defined it will be our source of requests to be replayed
FailedRequestsOutDir *string // if defined it will become our destination for files failing to be replayed, *none to be discarded
Modules []string // list of modules for which replay the requests, nil for all
// ProcessEvent will write into gob formnat file the Events that were failed to be exported.
func (efS *EfSv1) ProcessEvent(ctx *context.Context, args *utils.ArgsFailedPosts, reply *string) error {
return efS.efs.V1ProcessEvent(ctx, args, reply)
}
func (efS *EfSv1) ProcessEvent(ctx *context.Context, args *ArgsReplayFailedPosts, reply *string) error {
return nil
}
func (efS *EfSv1) ReplayEvents(ctx *context.Context, args *ArgsReplayFailedPosts, reply *string) error {
// ReplayEvents will read the Events from gob files that were failed to be exported and try to re-export them again.
func (efS *EfSv1) ReplayEvents(ctx *context.Context, args *efs.ArgsReplayFailedPosts, reply *string) error {
failedPostsDir := efS.cfg.LoggerCfg().Opts.FailedPostsDir
if args.FailedRequestsInDir != nil && *args.FailedRequestsInDir != utils.EmptyString {
failedPostsDir = *args.FailedRequestsInDir
@@ -87,8 +81,8 @@ func (efS *EfSv1) ReplayEvents(ctx *context.Context, args *ArgsReplayFailedPosts
}
}
filePath := path.Join(failedPostsDir, file.Name())
var expEv utils.FailoverPoster
if expEv, err = ees.NewFailoverPosterFromFile(filePath, args.TypeProvider); err != nil {
var expEv efs.FailoverPoster
if expEv, err = efs.NewFailoverPosterFromFile(filePath, args.TypeProvider, efS.efs); err != nil {
return err
}
// check if the failed out dir path is the same as the same in dir in order to export again in case of failure
@@ -97,9 +91,9 @@ func (efS *EfSv1) ReplayEvents(ctx *context.Context, args *ArgsReplayFailedPosts
failoverPath = path.Join(failedOutDir, file.Name())
}
err = expEv.ReplayFailedPosts(efS.cfg.EFsCfg().PosterAttempts)
err = expEv.ReplayFailedPosts(ctx, efS.cfg.EFsCfg().PosterAttempts, args.Tenant)
if err != nil && failedOutDir != utils.MetaNone { // Got error from HTTPPoster could be that content was not written, we need to write it ourselves
if err = utils.WriteToFile(failoverPath, expEv); err != nil {
if err = efs.WriteToFile(failoverPath, expEv); err != nil {
return utils.NewErrServerError(err)
}
}

View File

@@ -197,7 +197,7 @@ func (eeS *EeS) V1ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEe
utils.EEs, ee.Cfg().ID))
}
go func(evict, sync bool, ee EventExporter) {
if err := exportEventWithExporter(ctx, ee, cgrEv.CGREvent, evict, eeS.cfg, eeS.fltrS); err != nil {
if err := exportEventWithExporter(ctx, ee, eeS.connMgr, cgrEv.CGREvent, evict, eeS.cfg, eeS.fltrS, cgrEv.Tenant); err != nil {
withErr = true
}
if sync {
@@ -243,7 +243,8 @@ func (eeS *EeS) V1ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEe
return
}
func exportEventWithExporter(ctx *context.Context, exp EventExporter, ev *utils.CGREvent, oneTime bool, cfg *config.CGRConfig, filterS *engine.FilterS) (err error) {
func exportEventWithExporter(ctx *context.Context, exp EventExporter, connMngr *engine.ConnManager,
ev *utils.CGREvent, oneTime bool, cfg *config.CGRConfig, filterS *engine.FilterS, tnt string) (err error) {
defer func() {
updateEEMetrics(exp.GetMetrics(), ev.ID, ev.Event, err != nil, utils.FirstNonEmpty(exp.Cfg().Timezone,
cfg.GeneralCfg().DefaultTimezone))
@@ -276,16 +277,34 @@ func exportEventWithExporter(ctx *context.Context, exp EventExporter, ev *utils.
}
extraData := exp.ExtraData(ev)
return ExportWithAttempts(ctx, exp, eEv, extraData)
return ExportWithAttempts(ctx, exp, eEv, extraData, connMngr, tnt)
}
func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}, key interface{}) (err error) {
func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}, key interface{},
connMnngr *engine.ConnManager, tnt string) (err error) {
if exp.Cfg().FailedPostsDir != utils.MetaNone {
defer func() {
if err != nil {
utils.AddFailedMessage(exp.Cfg().FailedPostsDir, exp.Cfg().ExportPath,
exp.Cfg().Type, utils.EEs,
eEv, exp.Cfg().Opts.AsMapInterface())
args := &utils.ArgsFailedPosts{
Tenant: tnt,
Path: exp.Cfg().ExportPath,
Event: eEv,
FailedDir: exp.Cfg().FailedPostsDir,
Module: utils.EEs,
APIOpts: exp.Cfg().Opts.AsMapInterface(),
}
var reply string
if err = connMnngr.Call(ctx, exp.Cfg().EFsConns,
utils.EfSv1ProcessEvent, args, &reply); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> Exporter <%s> could not be written with <%s> service because err: <%s>",
utils.EEs, exp.Cfg().ID, utils.EFs, err.Error()))
/*
utils.AddFailedMessage(exp.Cfg().FailedPostsDir, exp.Cfg().ExportPath,
exp.Cfg().Type, utils.EEs,
eEv, exp.Cfg().Opts.AsMapInterface())
*/
}
}
}()
}
@@ -431,7 +450,7 @@ func (eeS *EeS) V1ArchiveEventsInReply(ctx *context.Context, args *ArchiveEvents
// exported will be true if there will be at least one exporter archived
exported = true
if err = exportEventWithExporter(ctx, ee, cgrEv, false, eeS.cfg, eeS.fltrS); err != nil {
if err = exportEventWithExporter(ctx, ee, eeS.connMgr, cgrEv, false, eeS.cfg, eeS.fltrS, cgrEv.Tenant); err != nil {
return err
}
}

View File

@@ -18,18 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ees
import (
"bytes"
"encoding/gob"
"os"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
)
/*
// NewFailoverPosterFromFile returns ExportEvents from the file
// used only on replay failed post
func NewFailoverPosterFromFile(filePath, providerType string) (failPoster utils.FailoverPoster, err error) {
@@ -321,3 +310,4 @@ func (expEv *FailedExportersEEs) ReplayFailedPosts(attempts int) (err error) {
}
return
}
*/

View File

@@ -106,7 +106,7 @@ func TestNatsEE(t *testing.T) {
"Destination": "1002",
},
}
if err := exportEventWithExporter(context.Background(), evExp, cgrEv, true, cgrCfg, new(engine.FilterS)); err != nil {
if err := exportEventWithExporter(context.Background(), evExp, nil, cgrEv, true, cgrCfg, new(engine.FilterS), "cgrates.org"); err != nil {
t.Fatal(err)
}
testCleanDirectory(t)
@@ -172,7 +172,7 @@ func TestNatsEE2(t *testing.T) {
"Destination": "1002",
},
}
if err := exportEventWithExporter(context.Background(), evExp, cgrEv, true, cgrCfg, new(engine.FilterS)); err != nil {
if err := exportEventWithExporter(context.Background(), evExp, nil, cgrEv, true, cgrCfg, new(engine.FilterS), "cgrates.org"); err != nil {
t.Fatal(err)
}
testCleanDirectory(t)

View File

@@ -160,7 +160,7 @@ func TestSQSPoster(t *testing.T) {
Attempts: 5,
Opts: opts,
}, nil)
if err := ExportWithAttempts(context.Background(), pstr, []byte(body), ""); err != nil {
if err := ExportWithAttempts(context.Background(), pstr, []byte(body), "", nil, "cgrates.org"); err != nil {
t.Fatal(err)
}
@@ -239,7 +239,7 @@ func TestS3Poster(t *testing.T) {
Attempts: 5,
Opts: opts,
}, nil)
if err := ExportWithAttempts(context.Background(), pstr, []byte(body), key); err != nil {
if err := ExportWithAttempts(context.Background(), pstr, []byte(body), key, nil, "cgrates.org"); err != nil {
t.Fatal(err)
}
key += ".json"
@@ -294,7 +294,7 @@ func TestAMQPv1Poster(t *testing.T) {
Attempts: 5,
Opts: opts,
}, nil)
if err := ExportWithAttempts(context.Background(), pstr, []byte(body), ""); err != nil {
if err := ExportWithAttempts(context.Background(), pstr, []byte(body), "", nil, "cgrates.org"); err != nil {
t.Fatal(err)
}
// Create client

View File

@@ -21,8 +21,10 @@ package efs
import (
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
type EfS struct {
@@ -38,3 +40,63 @@ func NewEfs(cfg *config.CGRConfig, connMgr *engine.ConnManager) *EfS {
connMgr: connMgr,
}
}
// V1ProcessEvent will write into gob formnat file the Events that were failed to be exported.
func (efs *EfS) V1ProcessEvent(ctx *context.Context, args *utils.ArgsFailedPosts, reply *string) error {
var format string
if _, has := args.APIOpts[utils.Format]; has {
format = utils.IfaceAsString(args.APIOpts[utils.Format])
}
key := utils.ConcatenatedKey(args.FailedDir, args.Path, format, args.Module)
switch args.Module {
case utils.EEs:
// also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id
var amqpQueueID string
var s3BucketID string
var sqsQueueID string
var kafkaTopic string
if _, has := args.APIOpts[utils.AMQPQueueID]; has {
amqpQueueID = utils.IfaceAsString(args.APIOpts[utils.AMQPQueueID])
}
if _, has := args.APIOpts[utils.S3Bucket]; has {
s3BucketID = utils.IfaceAsString(args.APIOpts[utils.S3Bucket])
}
if _, has := args.APIOpts[utils.SQSQueueID]; has {
sqsQueueID = utils.IfaceAsString(args.APIOpts[utils.SQSQueueID])
}
if _, has := args.APIOpts[kafkaTopic]; has {
kafkaTopic = utils.IfaceAsString(args.APIOpts[utils.KafkaTopic])
}
if qID := utils.FirstNonEmpty(amqpQueueID, s3BucketID,
sqsQueueID, kafkaTopic); len(qID) != 0 {
key = utils.ConcatenatedKey(key, qID)
}
case utils.Kafka:
}
var failedPost *FailedExportersLogg
if x, ok := failedPostCache.Get(key); ok {
if x != nil {
failedPost = x.(*FailedExportersLogg)
}
}
if failedPost == nil {
failedPost = &FailedExportersLogg{
Path: args.Path,
Format: format,
Opts: args.APIOpts,
Module: args.Module,
FailedPostsDir: args.FailedDir,
}
failedPostCache.Set(key, failedPost, nil)
}
failedPost.AddEvent(args.Event)
return nil
}
type ArgsReplayFailedPosts struct {
Tenant string
TypeProvider string
FailedRequestsInDir *string // if defined it will be our source of requests to be replayed
FailedRequestsOutDir *string // if defined it will become our destination for files failing to be replayed, *none to be discarded
Modules []string // list of modules for which replay the requests, nil for all
}

286
efs/failed_ees.go Normal file
View File

@@ -0,0 +1,286 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package efs
import (
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/ees"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
// FailedExportersEEs used to save the failed post to file
type FailedExportersEEs struct {
lk sync.RWMutex
Path string
Opts *config.EventExporterOpts
Format string
Events []interface{}
failedPostsDir string
module string
connMngr *engine.ConnManager
}
func AsOptsEESConfig(opts map[string]interface{}) (*config.EventExporterOpts, error) {
optsCfg := new(config.EventExporterOpts)
if len(opts) == 0 {
return optsCfg, nil
}
if _, has := opts[utils.CSVFieldSepOpt]; has {
optsCfg.CSVFieldSeparator = utils.StringPointer(utils.IfaceAsString(utils.CSVFieldSepOpt))
}
if _, has := opts[utils.ElsIndex]; has {
optsCfg.ElsIndex = utils.StringPointer(utils.IfaceAsString(utils.ElsIndex))
}
if _, has := opts[utils.ElsIfPrimaryTerm]; has {
x, err := utils.IfaceAsInt(utils.ElsIfPrimaryTerm)
if err != nil {
return nil, err
}
optsCfg.ElsIfPrimaryTerm = utils.IntPointer(x)
}
if _, has := opts[utils.ElsIfSeqNo]; has {
x, err := utils.IfaceAsInt(utils.ElsIfSeqNo)
if err != nil {
return nil, err
}
optsCfg.ElsIfSeqNo = utils.IntPointer(x)
}
if _, has := opts[utils.ElsOpType]; has {
optsCfg.ElsOpType = utils.StringPointer(utils.IfaceAsString(utils.ElsOpType))
}
if _, has := opts[utils.ElsPipeline]; has {
optsCfg.ElsPipeline = utils.StringPointer(utils.IfaceAsString(utils.ElsPipeline))
}
if _, has := opts[utils.ElsRouting]; has {
optsCfg.ElsRouting = utils.StringPointer(utils.IfaceAsString(utils.ElsRouting))
}
if _, has := opts[utils.ElsTimeout]; has {
t, err := utils.IfaceAsDuration(utils.ElsTimeout)
if err != nil {
return nil, err
}
optsCfg.ElsTimeout = &t
}
if _, has := opts[utils.ElsVersionLow]; has {
x, err := utils.IfaceAsInt(utils.ElsVersionLow)
if err != nil {
return nil, err
}
optsCfg.ElsVersion = utils.IntPointer(x)
}
if _, has := opts[utils.ElsVersionType]; has {
optsCfg.ElsVersionType = utils.StringPointer(utils.IfaceAsString(utils.ElsVersionType))
}
if _, has := opts[utils.ElsWaitForActiveShards]; has {
optsCfg.ElsWaitForActiveShards = utils.StringPointer(utils.IfaceAsString(utils.ElsWaitForActiveShards))
}
if _, has := opts[utils.SQLMaxIdleConnsCfg]; has {
x, err := utils.IfaceAsInt(utils.SQLMaxIdleConnsCfg)
if err != nil {
return nil, err
}
optsCfg.SQLMaxIdleConns = utils.IntPointer(x)
}
if _, has := opts[utils.SQLMaxOpenConns]; has {
x, err := utils.IfaceAsInt(utils.SQLMaxOpenConns)
if err != nil {
return nil, err
}
optsCfg.SQLMaxOpenConns = utils.IntPointer(x)
}
if _, has := opts[utils.SQLConnMaxLifetime]; has {
t, err := utils.IfaceAsDuration(utils.SQLConnMaxLifetime)
if err != nil {
return nil, err
}
optsCfg.SQLConnMaxLifetime = &t
}
if _, has := opts[utils.MYSQLDSNParams]; has {
optsCfg.MYSQLDSNParams = opts[utils.SQLConnMaxLifetime].(map[string]string)
}
if _, has := opts[utils.SQLTableNameOpt]; has {
optsCfg.SQLTableName = utils.StringPointer(utils.IfaceAsString(utils.SQLTableNameOpt))
}
if _, has := opts[utils.SQLDBNameOpt]; has {
optsCfg.SQLDBName = utils.StringPointer(utils.IfaceAsString(utils.SQLDBNameOpt))
}
if _, has := opts[utils.PgSSLModeCfg]; has {
optsCfg.PgSSLMode = utils.StringPointer(utils.IfaceAsString(utils.PgSSLModeCfg))
}
if _, has := opts[utils.KafkaTopic]; has {
optsCfg.KafkaTopic = utils.StringPointer(utils.IfaceAsString(utils.KafkaTopic))
}
if _, has := opts[utils.AMQPQueueID]; has {
optsCfg.AMQPQueueID = utils.StringPointer(utils.IfaceAsString(utils.AMQPQueueID))
}
if _, has := opts[utils.AMQPRoutingKey]; has {
optsCfg.AMQPRoutingKey = utils.StringPointer(utils.IfaceAsString(utils.AMQPRoutingKey))
}
if _, has := opts[utils.AMQPExchange]; has {
optsCfg.AMQPExchange = utils.StringPointer(utils.IfaceAsString(utils.AMQPExchange))
}
if _, has := opts[utils.AMQPExchangeType]; has {
optsCfg.AMQPExchangeType = utils.StringPointer(utils.IfaceAsString(utils.AMQPExchangeType))
}
if _, has := opts[utils.AWSRegion]; has {
optsCfg.AWSRegion = utils.StringPointer(utils.IfaceAsString(utils.AWSRegion))
}
if _, has := opts[utils.AWSKey]; has {
optsCfg.AWSKey = utils.StringPointer(utils.IfaceAsString(utils.AWSKey))
}
if _, has := opts[utils.AWSSecret]; has {
optsCfg.AWSSecret = utils.StringPointer(utils.IfaceAsString(utils.AWSSecret))
}
if _, has := opts[utils.AWSToken]; has {
optsCfg.AWSToken = utils.StringPointer(utils.IfaceAsString(utils.AWSToken))
}
if _, has := opts[utils.SQSQueueID]; has {
optsCfg.SQSQueueID = utils.StringPointer(utils.IfaceAsString(utils.SQSQueueID))
}
if _, has := opts[utils.S3Bucket]; has {
optsCfg.S3BucketID = utils.StringPointer(utils.IfaceAsString(utils.S3Bucket))
}
if _, has := opts[utils.S3FolderPath]; has {
optsCfg.S3FolderPath = utils.StringPointer(utils.IfaceAsString(utils.S3FolderPath))
}
if _, has := opts[utils.NatsJetStream]; has {
x, err := utils.IfaceAsBool(utils.NatsJetStream)
if err != nil {
return nil, err
}
optsCfg.NATSJetStream = utils.BoolPointer(x)
}
if _, has := opts[utils.NatsSubject]; has {
optsCfg.NATSSubject = utils.StringPointer(utils.IfaceAsString(utils.NatsSubject))
}
if _, has := opts[utils.NatsJWTFile]; has {
optsCfg.NATSJWTFile = utils.StringPointer(utils.IfaceAsString(utils.NatsJWTFile))
}
if _, has := opts[utils.NatsSeedFile]; has {
optsCfg.NATSSeedFile = utils.StringPointer(utils.IfaceAsString(utils.NatsSeedFile))
}
if _, has := opts[utils.NatsCertificateAuthority]; has {
optsCfg.NATSCertificateAuthority = utils.StringPointer(utils.IfaceAsString(utils.NatsCertificateAuthority))
}
if _, has := opts[utils.NatsClientCertificate]; has {
optsCfg.NATSClientCertificate = utils.StringPointer(utils.IfaceAsString(utils.NatsClientCertificate))
}
if _, has := opts[utils.NatsClientKey]; has {
optsCfg.NATSClientKey = utils.StringPointer(utils.IfaceAsString(utils.NatsClientKey))
}
if _, has := opts[utils.NatsJetStreamMaxWait]; has {
t, err := utils.IfaceAsDuration(utils.NatsJetStreamMaxWait)
if err != nil {
return nil, err
}
optsCfg.NATSJetStreamMaxWait = &t
}
if _, has := opts[utils.RpcCodec]; has {
optsCfg.RPCCodec = utils.StringPointer(utils.IfaceAsString(utils.RpcCodec))
}
if _, has := opts[utils.ServiceMethod]; has {
optsCfg.ServiceMethod = utils.StringPointer(utils.IfaceAsString(utils.ServiceMethod))
}
if _, has := opts[utils.KeyPath]; has {
optsCfg.KeyPath = utils.StringPointer(utils.IfaceAsString(utils.KeyPath))
}
if _, has := opts[utils.CertPath]; has {
optsCfg.CertPath = utils.StringPointer(utils.IfaceAsString(utils.CertPath))
}
if _, has := opts[utils.CaPath]; has {
optsCfg.CAPath = utils.StringPointer(utils.IfaceAsString(utils.CaPath))
}
if _, has := opts[utils.Tls]; has {
x, err := utils.IfaceAsBool(utils.Tls)
if err != nil {
return nil, err
}
optsCfg.TLS = utils.BoolPointer(x)
}
if _, has := opts[utils.ConnIDs]; has {
optsCfg.ConnIDs = opts[utils.ConnIDs].(*[]string)
}
if _, has := opts[utils.RpcConnTimeout]; has {
t, err := utils.IfaceAsDuration(utils.RpcConnTimeout)
if err != nil {
return nil, err
}
optsCfg.RPCConnTimeout = &t
}
if _, has := opts[utils.RpcReplyTimeout]; has {
t, err := utils.IfaceAsDuration(utils.RpcReplyTimeout)
if err != nil {
return nil, err
}
optsCfg.RPCReplyTimeout = &t
}
if _, has := opts[utils.RPCAPIOpts]; has {
optsCfg.RPCAPIOpts = opts[utils.RPCAPIOpts].(map[string]interface{})
}
return optsCfg, nil
}
// AddEvent adds one event
func (expEv *FailedExportersEEs) AddEvent(ev interface{}) {
expEv.lk.Lock()
expEv.Events = append(expEv.Events, ev)
expEv.lk.Unlock()
}
// ReplayFailedPosts tryies to post cdrs again
func (expEv *FailedExportersEEs) ReplayFailedPosts(ctx *context.Context, attempts int, tnt string) (err error) {
eesFailedEvents := &FailedExportersEEs{
Path: expEv.Path,
Opts: expEv.Opts,
Format: expEv.Format,
}
var ee ees.EventExporter
if ee, err = ees.NewEventExporter(&config.EventExporterCfg{
ID: "ReplayFailedPosts",
Type: expEv.Format,
ExportPath: expEv.Path,
Opts: expEv.Opts,
Attempts: attempts,
FailedPostsDir: utils.MetaNone,
}, config.CgrConfig(), nil, nil); err != nil {
return
}
keyFunc := func() string { return utils.EmptyString }
if expEv.Format == utils.MetaKafkajsonMap || expEv.Format == utils.MetaS3jsonMap {
keyFunc = utils.UUIDSha1Prefix
}
for _, ev := range expEv.Events {
if err = ees.ExportWithAttempts(context.Background(), ee, ev, keyFunc(), expEv.connMngr, tnt); err != nil {
eesFailedEvents.AddEvent(ev)
}
}
ee.Close()
if len(eesFailedEvents.Events) > 0 {
err = utils.ErrPartiallyExecuted
} else {
eesFailedEvents = nil
}
return
}

111
efs/failed_loggs.go Normal file
View File

@@ -0,0 +1,111 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package efs
import (
"bytes"
"encoding/gob"
"os"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/segmentio/kafka-go"
)
// FailedExportersLogg is a failover poster for kafka logger type
type FailedExportersLogg struct {
lk sync.RWMutex
Path string
Opts map[string]interface{} // this is meta
Format string
Events []interface{}
FailedPostsDir string
Module string
efsConns []string
connMngr *engine.ConnManager
}
// AddEvent adds one event
func (expEv *FailedExportersLogg) AddEvent(ev interface{}) {
expEv.lk.Lock()
expEv.Events = append(expEv.Events, ev)
expEv.lk.Unlock()
}
// NewExportEventsFromFile returns ExportEvents from the file
// used only on replay failed post
func NewExportEventsFromFile(filePath string) (expEv *FailedExportersLogg, err error) {
var fileContent []byte
if fileContent, err = os.ReadFile(filePath); err != nil {
return nil, err
}
if err = os.Remove(filePath); err != nil {
return nil, err
}
dec := gob.NewDecoder(bytes.NewBuffer(fileContent))
// unmarshall it
expEv = new(FailedExportersLogg)
err = dec.Decode(&expEv)
return
}
// ReplayFailedPosts tryies to post cdrs again
func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attempts int, tnt string) (err error) {
nodeID := utils.IfaceAsString(expEv.Opts[utils.NodeID])
logLvl, err := utils.IfaceAsInt(expEv.Opts[utils.Level])
if err != nil {
return
}
expLogger := utils.NewExportLogger(nodeID, tnt, logLvl,
expEv.Path, expEv.Format, attempts, expEv.FailedPostsDir)
for _, event := range expEv.Events {
var content []byte
if content, err = utils.ToUnescapedJSON(event); err != nil {
return
}
if err = expLogger.Writer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(utils.GenUUID()),
Value: content,
}); err != nil {
var reply string
// if there are any errors in kafka, we will post in FailedPostDirectory
if err = expEv.connMngr.Call(ctx, expEv.efsConns, utils.EfSv1ProcessEvent,
&utils.ArgsFailedPosts{
Tenant: tnt,
Path: expLogger.Writer.Addr.String(),
Event: event,
FailedDir: expLogger.FldPostDir,
Module: utils.Kafka,
APIOpts: expLogger.GetMeta(),
}, &reply); err != nil {
return err
/*
utils.AddFailedMessage(expLogger.FldPostDir, expLogger.Writer.Addr.String(), utils.
MetaKafkaLog, utils.Kafka,
event, expLogger.GetMeta())
*/
}
return nil
}
}
return err
}

124
efs/libefs.go Normal file
View File

@@ -0,0 +1,124 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package efs
import (
"bytes"
"encoding/gob"
"fmt"
"os"
"path"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/ltcache"
)
var failedPostCache *ltcache.Cache
func init() {
failedPostCache = ltcache.NewCache(-1, 5*time.Second, true, writeFailedPosts)
}
// SetFailedPostCacheTTL recreates the failed cache
func SetFailedPostCacheTTL(ttl time.Duration) {
failedPostCache = ltcache.NewCache(-1, ttl, true, writeFailedPosts)
}
func writeFailedPosts(_ string, value interface{}) {
expEv, canConvert := value.(*FailedExportersLogg)
if !canConvert {
return
}
filePath := expEv.FilePath()
expEv.lk.RLock()
if err := WriteToFile(filePath, expEv); err != nil {
utils.Logger.Warning(fmt.Sprintf("Unable to write failed post to file <%s> because <%s>",
filePath, err))
expEv.lk.RUnlock()
return
}
expEv.lk.RUnlock()
}
// FilePath returns the file path it should use for saving the failed events
func (expEv *FailedExportersLogg) FilePath() string {
return path.Join(expEv.FailedPostsDir, expEv.Module+utils.PipeSep+utils.UUIDSha1Prefix()+utils.GOBSuffix)
}
type FailoverPoster interface {
ReplayFailedPosts(*context.Context, int, string) error
}
// WriteToFile writes the events to file
func WriteToFile(filePath string, expEv FailoverPoster) (err error) {
fileOut, err := os.Create(filePath)
if err != nil {
return err
}
encd := gob.NewEncoder(fileOut)
gob.Register(new(utils.CGREvent))
err = encd.Encode(expEv)
fileOut.Close()
return
}
// NewFailoverPosterFromFile returns ExportEvents from the file
// used only on replay failed post
func NewFailoverPosterFromFile(filePath, providerType string, efs *EfS) (failPoster FailoverPoster, err error) {
var fileContent []byte
err = guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) error {
if fileContent, err = os.ReadFile(filePath); err != nil {
return err
}
return os.Remove(filePath)
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath)
if err != nil {
return
}
dec := gob.NewDecoder(bytes.NewBuffer(fileContent))
// unmarshall it
expEv := new(FailedExportersLogg)
err = dec.Decode(&expEv)
switch providerType {
case utils.EEs:
opts, err := AsOptsEESConfig(expEv.Opts)
if err != nil {
return nil, err
}
failPoster = &FailedExportersEEs{
module: expEv.Module,
failedPostsDir: expEv.FailedPostsDir,
Path: expEv.Path,
Opts: opts,
Events: expEv.Events,
Format: expEv.Format,
connMngr: efs.connMgr,
}
case utils.Kafka:
expEv.efsConns = efs.cfg.LoggerCfg().EFsConns
expEv.connMngr = efs.connMgr
failPoster = expEv
}
return
}

View File

@@ -30,7 +30,7 @@ import (
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/ees"
"github.com/cgrates/cgrates/efs"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/loaders"
"github.com/cgrates/cgrates/utils"
@@ -203,19 +203,19 @@ func testCDRsPostFailoverToFile(t *testing.T) {
fileName := file.Name()
filePath := path.Join(cdrsPostFailCfg.EFsCfg().FailedPostsDir, fileName)
ev, err := ees.NewFailoverPosterFromFile(filePath, utils.EEs)
ev, err := efs.NewFailoverPosterFromFile(filePath, utils.EEs)
if err != nil {
t.Errorf("<%s> for file <%s>", err, fileName)
continue
} else if len(ev.(*ees.FailedExportersEEs).Events) == 0 {
} else if len(ev.(*efs.FailedExportersEEs).Events) == 0 {
t.Error("Expected at least one event")
continue
}
if ev.(*ees.FailedExportersEEs).Format != utils.MetaS3jsonMap {
t.Errorf("Expected event to use %q received: %q", utils.MetaS3jsonMap, ev.(*ees.FailedExportersEEs).Format)
if ev.(*efs.FailedExportersEEs).Format != utils.MetaS3jsonMap {
t.Errorf("Expected event to use %q received: %q", utils.MetaS3jsonMap, ev.(*efs.FailedExportersEEs).Format)
}
if len(ev.(*ees.FailedExportersEEs).Events) != 3 {
t.Errorf("Expected all the events to be saved in the same file, ony %v saved in this file.", len(ev.(*ees.FailedExportersEEs).Events))
if len(ev.(*efs.FailedExportersEEs).Events) != 3 {
t.Errorf("Expected all the events to be saved in the same file, ony %v saved in this file.", len(ev.(*efs.FailedExportersEEs).Events))
}
}
}

View File

@@ -32,6 +32,7 @@ import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/efs"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/registrarc"
"github.com/cgrates/cgrates/servmanager"
@@ -364,7 +365,7 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags
cgr.cfg.LoggerCfg().Opts.FailedPostsDir); err != nil {
return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err)
}
utils.SetFailedPostCacheTTL(cgr.cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing
efs.SetFailedPostCacheTTL(cgr.cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing
utils.Logger.Info(fmt.Sprintf("<CoreS> starting version <%s><%s>", vers, runtime.Version()))
cgr.cfg.LazySanityCheck()

View File

@@ -780,6 +780,15 @@ func IsURL(path string) bool {
strings.HasPrefix(path, "http://")
}
type ArgsFailedPosts struct {
Tenant string
Path string // Path of the exported type
Event interface{} // Event that must be written in file
FailedDir string // Directory that contains the file with Failed post
Module string // Type of efs <*ees|*kafkaLogger>
APIOpts map[string]interface{} // Specially for the meta
}
// GetIndexesArg the API argumets to specify an index
type GetIndexesArg struct {
IdxItmType string

View File

@@ -18,20 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package utils
import (
"bytes"
"encoding/gob"
"fmt"
"os"
"path"
"sync"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/ltcache"
"github.com/segmentio/kafka-go"
)
/*
var failedPostCache *ltcache.Cache
func init() {
@@ -159,13 +146,12 @@ func NewExportEventsFromFile(filePath string) (expEv *FailedExportersLogg, err e
}
type FailoverPoster interface {
ReplayFailedPosts(int) error
ReplayFailedPosts(int, string) error
}
// ReplayFailedPosts tryies to post cdrs again
func (expEv *FailedExportersLogg) ReplayFailedPosts(attempts int) (err error) {
func (expEv *FailedExportersLogg) ReplayFailedPosts(attempts int, tnt string) (err error) {
nodeID := IfaceAsString(expEv.Opts[NodeID])
tnt := IfaceAsString(expEv.Opts[Tenant])
logLvl, err := IfaceAsInt(expEv.Opts[Level])
if err != nil {
return
@@ -177,15 +163,16 @@ func (expEv *FailedExportersLogg) ReplayFailedPosts(attempts int) (err error) {
if content, err = ToUnescapedJSON(event); err != nil {
return
}
if err = expLogger.writer.WriteMessages(context.Background(), kafka.Message{
if err = expLogger.Writer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(GenUUID()),
Value: content,
}); err != nil {
// if there are any errors in kafka, we will post in FailedPostDirectory
AddFailedMessage(expLogger.fldPostDir, expLogger.writer.Addr.String(), MetaKafkaLog, Kafka,
AddFailedMessage(expLogger.FldPostDir, expLogger.Writer.Addr.String(), MetaKafkaLog, Kafka,
event, expLogger.GetMeta())
return nil
}
}
return err
}
*/

View File

@@ -31,22 +31,22 @@ import (
type ExportLogger struct {
sync.Mutex
logLevel int
fldPostDir string
writer *kafka.Writer
nodeID string
tenant string
LogLevel int
FldPostDir string
Writer *kafka.Writer
NodeID string
Tenant string
}
// NewExportLogger will export loggers to kafka
func NewExportLogger(nodeID, tenant string, level int,
connOpts, connTopic string, attempts int, fldPostDir string) (el *ExportLogger) {
el = &ExportLogger{
logLevel: level,
fldPostDir: fldPostDir,
nodeID: nodeID,
tenant: tenant,
writer: &kafka.Writer{
LogLevel: level,
FldPostDir: fldPostDir,
NodeID: nodeID,
Tenant: tenant,
Writer: &kafka.Writer{
Addr: kafka.TCP(connOpts),
Topic: connTopic,
MaxAttempts: attempts,
@@ -56,18 +56,18 @@ func NewExportLogger(nodeID, tenant string, level int,
}
func (el *ExportLogger) Close() (err error) {
if el.writer != nil {
err = el.writer.Close()
el.writer = nil
if el.Writer != nil {
err = el.Writer.Close()
el.Writer = nil
}
return
}
func (el *ExportLogger) call(m string, level int) (err error) {
eventExport := &CGREvent{
Tenant: el.tenant,
Tenant: el.Tenant,
Event: map[string]interface{}{
NodeID: el.nodeID,
NodeID: el.NodeID,
Message: m,
Severity: level,
Timestamp: time.Now().Format("2006-01-02 15:04:05"),
@@ -78,15 +78,17 @@ func (el *ExportLogger) call(m string, level int) (err error) {
if content, err = ToUnescapedJSON(eventExport); err != nil {
return
}
if err = el.writer.WriteMessages(context.Background(), kafka.Message{
if err = el.Writer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(GenUUID()),
Value: content,
}); err != nil {
// if there are any errors in kafka, we will post in FailedPostDirectory
AddFailedMessage(el.fldPostDir, el.writer.Addr.String(), MetaKafkaLog, Kafka,
eventExport, el.GetMeta())
// also the content should be printed as a stdout logger type
return ErrLoggerChanged
/*
// if there are any errors in kafka, we will post in FailedPostDirectory
AddFailedMessage(el.FldPostDir, el.Writer.Addr.String(), MetaKafkaLog, Kafka,
eventExport, el.GetMeta())
// also the content should be printed as a stdout logger type
return ErrLoggerChanged
*/
}
return
}
@@ -103,22 +105,22 @@ func (sl *ExportLogger) GetSyslog() *syslog.Writer {
// GetLogLevel() returns the level logger number for the server
func (el *ExportLogger) GetLogLevel() int {
return el.logLevel
return el.LogLevel
}
// SetLogLevel changes the log level
func (el *ExportLogger) SetLogLevel(level int) {
el.logLevel = level
el.LogLevel = level
}
// Alert logs to EEs with alert level
func (el *ExportLogger) Alert(m string) (err error) {
if el.logLevel < LOGLEVEL_ALERT {
if el.LogLevel < LOGLEVEL_ALERT {
return nil
}
if err = el.call(m, LOGLEVEL_ALERT); err != nil {
if err == ErrLoggerChanged {
NewStdLogger(el.nodeID, el.logLevel).Alert(m)
NewStdLogger(el.NodeID, el.LogLevel).Alert(m)
err = nil
}
}
@@ -127,12 +129,12 @@ func (el *ExportLogger) Alert(m string) (err error) {
// Crit logs to EEs with critical level
func (el *ExportLogger) Crit(m string) (err error) {
if el.logLevel < LOGLEVEL_CRITICAL {
if el.LogLevel < LOGLEVEL_CRITICAL {
return nil
}
if el.call(m, LOGLEVEL_CRITICAL); err != nil {
if err == ErrLoggerChanged {
NewStdLogger(el.nodeID, el.logLevel).Crit(m)
NewStdLogger(el.NodeID, el.LogLevel).Crit(m)
err = nil
}
}
@@ -141,12 +143,12 @@ func (el *ExportLogger) Crit(m string) (err error) {
// Debug logs to EEs with debug level
func (el *ExportLogger) Debug(m string) (err error) {
if el.logLevel < LOGLEVEL_DEBUG {
if el.LogLevel < LOGLEVEL_DEBUG {
return nil
}
if err = el.call(m, LOGLEVEL_DEBUG); err != nil {
if err == ErrLoggerChanged {
NewStdLogger(el.nodeID, el.logLevel).Debug(m)
NewStdLogger(el.NodeID, el.LogLevel).Debug(m)
err = nil
}
}
@@ -155,12 +157,12 @@ func (el *ExportLogger) Debug(m string) (err error) {
// Emerg logs to EEs with emergency level
func (el *ExportLogger) Emerg(m string) (err error) {
if el.logLevel < LOGLEVEL_EMERGENCY {
if el.LogLevel < LOGLEVEL_EMERGENCY {
return nil
}
if err = el.call(m, LOGLEVEL_EMERGENCY); err != nil {
if err == ErrLoggerChanged {
NewStdLogger(el.nodeID, el.logLevel).Emerg(m)
NewStdLogger(el.NodeID, el.LogLevel).Emerg(m)
err = nil
}
}
@@ -169,12 +171,12 @@ func (el *ExportLogger) Emerg(m string) (err error) {
// Err logs to EEs with error level
func (el *ExportLogger) Err(m string) (err error) {
if el.logLevel < LOGLEVEL_ERROR {
if el.LogLevel < LOGLEVEL_ERROR {
return nil
}
if err = el.call(m, LOGLEVEL_ERROR); err != nil {
if err == ErrLoggerChanged {
NewStdLogger(el.nodeID, el.logLevel).Err(m)
NewStdLogger(el.NodeID, el.LogLevel).Err(m)
err = nil
}
}
@@ -183,12 +185,12 @@ func (el *ExportLogger) Err(m string) (err error) {
// Info logs to EEs with info level
func (el *ExportLogger) Info(m string) (err error) {
if el.logLevel < LOGLEVEL_INFO {
if el.LogLevel < LOGLEVEL_INFO {
return nil
}
if err = el.call(m, LOGLEVEL_INFO); err != nil {
if err == ErrLoggerChanged {
NewStdLogger(el.nodeID, el.logLevel).Info(m)
NewStdLogger(el.NodeID, el.LogLevel).Info(m)
err = nil
}
}
@@ -197,12 +199,12 @@ func (el *ExportLogger) Info(m string) (err error) {
// Notice logs to EEs with notice level
func (el *ExportLogger) Notice(m string) (err error) {
if el.logLevel < LOGLEVEL_NOTICE {
if el.LogLevel < LOGLEVEL_NOTICE {
return nil
}
if err = el.call(m, LOGLEVEL_NOTICE); err != nil {
if err == ErrLoggerChanged {
NewStdLogger(el.nodeID, el.logLevel).Notice(m)
NewStdLogger(el.NodeID, el.LogLevel).Notice(m)
err = nil
}
}
@@ -211,12 +213,12 @@ func (el *ExportLogger) Notice(m string) (err error) {
// Warning logs to EEs with warning level
func (el *ExportLogger) Warning(m string) (err error) {
if el.logLevel < LOGLEVEL_WARNING {
if el.LogLevel < LOGLEVEL_WARNING {
return nil
}
if err = el.call(m, LOGLEVEL_WARNING); err != nil {
if err == ErrLoggerChanged {
NewStdLogger(el.nodeID, el.logLevel).Warning(m)
NewStdLogger(el.NodeID, el.LogLevel).Warning(m)
err = nil
}
}
@@ -225,12 +227,12 @@ func (el *ExportLogger) Warning(m string) (err error) {
func (el *ExportLogger) GetMeta() map[string]interface{} {
return map[string]interface{}{
Tenant: el.tenant,
NodeID: el.nodeID,
Level: el.logLevel,
Format: el.writer.Topic,
Conn: el.writer.Addr.String(),
FailedPostsDir: el.fldPostDir,
Attempts: el.writer.MaxAttempts,
Tenant: el.Tenant,
NodeID: el.NodeID,
Level: el.LogLevel,
Format: el.Writer.Topic,
Conn: el.Writer.Addr.String(),
FailedPostsDir: el.FldPostDir,
Attempts: el.Writer.MaxAttempts,
}
}