From 9b8ac3199b3bf11f5a9e00adf42e716c0097355a Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 25 Sep 2024 18:47:46 +0300 Subject: [PATCH] Revise EfSv1.ReplayEvents API + tests - renamed parameter type: ArgsReplyFailedPosts -> ReplayEventsParams - renamed param fields: - FailedRequestsInDir -> SourcePath - FailedRequestsOutDir -> FailedPath - TypeProvider -> Provider - changed param fields types from *string to string - used the SourcePath and FailedPath params directly instead of creating separate variables - used filepath.WalkDir instead of reading the directory and looping over the entries - used slices.ContainsFunc to check if the file belongs to any module (if 1+ is specified) - used filepath.Join instead of path.Join - used the path provided by WalkFunc instead of building the file paths ourselves - made error returns more descriptive - added logs for directories/files that are skipped - paths that cannot be accessed are skipped after logging the error --- data/conf/cgrates/cgrates.json | 2 + dispatchers/efs.go | 5 +- dispatchers/ers.go | 3 +- ees/libcdre.go | 308 ------------------------ ees/libcdre_it_test.go | 106 -------- ees/libcdre_test.go | 156 ------------ efs/efs.go | 96 ++++---- efs/efs_it_test.go | 187 +++++++++++++- efs/failed_ees.go | 16 +- efs/{failed_loggs.go => failed_logs.go} | 48 ++-- efs/libefs.go | 43 ++-- go.mod | 1 + utils/coreutils.go | 8 - utils/failover_export.go | 178 -------------- 14 files changed, 303 insertions(+), 854 deletions(-) delete mode 100644 ees/libcdre.go delete mode 100644 ees/libcdre_it_test.go delete mode 100644 ees/libcdre_test.go rename efs/{failed_loggs.go => failed_logs.go} (68%) delete mode 100644 utils/failover_export.go diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 0f0adfe1c..ff29f72a5 100755 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -1080,6 +1080,8 @@ // "store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur> // "store_uncompressed_limit": 0, // used to compress data // "thresholds_conns": [], // connections to ThresholdS for StatUpdates, empty to disable thresholds functionality: <""|*internal|$rpc_conns_id> +// "ees_conns": [], // connections to EEs for StatUpdates, empty to disable export functionality: <""|*internal|$rpc_conns_id> +// "ees_exporter_ids": [], // list of EventExporter profiles to use for real-time StatUpdate exports // "indexed_selects": true, // enable profile matching exclusively on indexes // //"string_indexed_fields": [], // query indexes based on these fields for faster processing // "prefix_indexed_fields": [], // query indexes based on these fields for faster processing diff --git a/dispatchers/efs.go b/dispatchers/efs.go index cda1baf9f..a1c4cbb35 100644 --- a/dispatchers/efs.go +++ b/dispatchers/efs.go @@ -21,6 +21,7 @@ package dispatchers import ( "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/efs" "github.com/cgrates/cgrates/utils" ) @@ -51,9 +52,9 @@ func (dS *DispatcherService) EfSv1ProcessEvent(ctx *context.Context, args *utils } return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaEFs, utils.EfSv1ProcessEvent, args, reply) } -func (dS *DispatcherService) EfSv1ReplayEvents(ctx *context.Context, args *utils.ArgsReplayFailedPosts, reply *string) (err error) { +func (dS *DispatcherService) EfSv1ReplayEvents(ctx *context.Context, args efs.ReplayEventsParams, reply *string) (err error) { tnt := dS.cfg.GeneralCfg().DefaultTenant - if args != nil && len(args.Tenant) != 0 { + if len(args.Tenant) != 0 { tnt = args.Tenant } ev := make(map[string]any) diff --git a/dispatchers/ers.go b/dispatchers/ers.go index 2f4973e95..996479918 100644 --- a/dispatchers/ers.go +++ b/dispatchers/ers.go @@ -21,6 +21,7 @@ package dispatchers import ( "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/ers" "github.com/cgrates/cgrates/utils" ) @@ -39,7 +40,7 @@ func (dS *DispatcherService) ErSv1Ping(ctx *context.Context, args *utils.CGREven } return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaERs, utils.ErSv1Ping, args, reply) } -func (dS *DispatcherService) ErSv1RunReader(ctx *context.Context, args utils.StringWithAPIOpts, reply *string) (err error) { +func (dS *DispatcherService) ErSv1RunReader(ctx *context.Context, args ers.V1RunReaderParams, reply *string) (err error) { tnt := dS.cfg.GeneralCfg().DefaultTenant if len(args.Tenant) != 0 { tnt = args.Tenant diff --git a/ees/libcdre.go b/ees/libcdre.go deleted file mode 100644 index d64dc19c8..000000000 --- a/ees/libcdre.go +++ /dev/null @@ -1,308 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package ees - -/* -// NewFailoverPosterFromFile returns ExportEvents from the file -// used only on replay failed post -func NewFailoverPosterFromFile(filePath, providerType string) (failPoster utils.FailoverPoster, err error) { - var fileContent []byte - err = guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) error { - if fileContent, err = os.ReadFile(filePath); err != nil { - return err - } - return os.Remove(filePath) - }, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath) - if err != nil { - return - } - dec := gob.NewDecoder(bytes.NewBuffer(fileContent)) - // unmarshall it - expEv := new(utils.FailedExportersLogg) - err = dec.Decode(&expEv) - switch providerType { - case utils.EEs: - opts, err := AsOptsEESConfig(expEv.Opts) - if err != nil { - return nil, err - } - failPoster = &FailedExportersEEs{ - module: expEv.Module, - failedPostsDir: expEv.FailedPostsDir, - Path: expEv.Path, - Opts: opts, - Events: expEv.Events, - Format: expEv.Format, - } - case utils.Kafka: - failPoster = expEv - } - return -} - -func AsOptsEESConfig(opts map[string]any) (*config.EventExporterOpts, error) { - optsCfg := new(config.EventExporterOpts) - if len(opts) == 0 { - return optsCfg, nil - } - if _, has := opts[utils.CSVFieldSepOpt]; has { - optsCfg.CSVFieldSeparator = utils.StringPointer(utils.IfaceAsString(utils.CSVFieldSepOpt)) - } - if _, has := opts[utils.ElsIndex]; has { - optsCfg.ElsIndex = utils.StringPointer(utils.IfaceAsString(utils.ElsIndex)) - } - if _, has := opts[utils.ElsIfPrimaryTerm]; has { - x, err := utils.IfaceAsInt(utils.ElsIfPrimaryTerm) - if err != nil { - return nil, err - } - optsCfg.ElsIfPrimaryTerm = utils.IntPointer(x) - } - if _, has := opts[utils.ElsIfSeqNo]; has { - x, err := utils.IfaceAsInt(utils.ElsIfSeqNo) - if err != nil { - return nil, err - } - optsCfg.ElsIfSeqNo = utils.IntPointer(x) - } - if _, has := opts[utils.ElsOpType]; has { - optsCfg.ElsOpType = utils.StringPointer(utils.IfaceAsString(utils.ElsOpType)) - } - if _, has := opts[utils.ElsPipeline]; has { - optsCfg.ElsPipeline = utils.StringPointer(utils.IfaceAsString(utils.ElsPipeline)) - } - if _, has := opts[utils.ElsRouting]; has { - optsCfg.ElsRouting = utils.StringPointer(utils.IfaceAsString(utils.ElsRouting)) - } - if _, has := opts[utils.ElsTimeout]; has { - t, err := utils.IfaceAsDuration(utils.ElsTimeout) - if err != nil { - return nil, err - } - optsCfg.ElsTimeout = &t - } - if _, has := opts[utils.ElsVersionLow]; has { - x, err := utils.IfaceAsInt(utils.ElsVersionLow) - if err != nil { - return nil, err - } - optsCfg.ElsVersion = utils.IntPointer(x) - } - if _, has := opts[utils.ElsVersionType]; has { - optsCfg.ElsVersionType = utils.StringPointer(utils.IfaceAsString(utils.ElsVersionType)) - } - if _, has := opts[utils.ElsWaitForActiveShards]; has { - optsCfg.ElsWaitForActiveShards = utils.StringPointer(utils.IfaceAsString(utils.ElsWaitForActiveShards)) - } - if _, has := opts[utils.SQLMaxIdleConnsCfg]; has { - x, err := utils.IfaceAsInt(utils.SQLMaxIdleConnsCfg) - if err != nil { - return nil, err - } - optsCfg.SQLMaxIdleConns = utils.IntPointer(x) - } - if _, has := opts[utils.SQLMaxOpenConns]; has { - x, err := utils.IfaceAsInt(utils.SQLMaxOpenConns) - if err != nil { - return nil, err - } - optsCfg.SQLMaxOpenConns = utils.IntPointer(x) - } - if _, has := opts[utils.SQLConnMaxLifetime]; has { - t, err := utils.IfaceAsDuration(utils.SQLConnMaxLifetime) - if err != nil { - return nil, err - } - optsCfg.SQLConnMaxLifetime = &t - } - if _, has := opts[utils.MYSQLDSNParams]; has { - optsCfg.MYSQLDSNParams = opts[utils.SQLConnMaxLifetime].(map[string]string) - } - if _, has := opts[utils.SQLTableNameOpt]; has { - optsCfg.SQLTableName = utils.StringPointer(utils.IfaceAsString(utils.SQLTableNameOpt)) - } - if _, has := opts[utils.SQLDBNameOpt]; has { - optsCfg.SQLDBName = utils.StringPointer(utils.IfaceAsString(utils.SQLDBNameOpt)) - } - if _, has := opts[utils.PgSSLModeCfg]; has { - optsCfg.PgSSLMode = utils.StringPointer(utils.IfaceAsString(utils.PgSSLModeCfg)) - } - if _, has := opts[utils.KafkaTopic]; has { - optsCfg.KafkaTopic = utils.StringPointer(utils.IfaceAsString(utils.KafkaTopic)) - } - if _, has := opts[utils.AMQPQueueID]; has { - optsCfg.AMQPQueueID = utils.StringPointer(utils.IfaceAsString(utils.AMQPQueueID)) - } - if _, has := opts[utils.AMQPRoutingKey]; has { - optsCfg.AMQPRoutingKey = utils.StringPointer(utils.IfaceAsString(utils.AMQPRoutingKey)) - } - if _, has := opts[utils.AMQPExchange]; has { - optsCfg.AMQPExchange = utils.StringPointer(utils.IfaceAsString(utils.AMQPExchange)) - } - if _, has := opts[utils.AMQPExchangeType]; has { - optsCfg.AMQPExchangeType = utils.StringPointer(utils.IfaceAsString(utils.AMQPExchangeType)) - } - if _, has := opts[utils.AWSRegion]; has { - optsCfg.AWSRegion = utils.StringPointer(utils.IfaceAsString(utils.AWSRegion)) - } - if _, has := opts[utils.AWSKey]; has { - optsCfg.AWSKey = utils.StringPointer(utils.IfaceAsString(utils.AWSKey)) - } - if _, has := opts[utils.AWSSecret]; has { - optsCfg.AWSSecret = utils.StringPointer(utils.IfaceAsString(utils.AWSSecret)) - } - if _, has := opts[utils.AWSToken]; has { - optsCfg.AWSToken = utils.StringPointer(utils.IfaceAsString(utils.AWSToken)) - } - if _, has := opts[utils.SQSQueueID]; has { - optsCfg.SQSQueueID = utils.StringPointer(utils.IfaceAsString(utils.SQSQueueID)) - } - if _, has := opts[utils.S3Bucket]; has { - optsCfg.S3BucketID = utils.StringPointer(utils.IfaceAsString(utils.S3Bucket)) - } - if _, has := opts[utils.S3FolderPath]; has { - optsCfg.S3FolderPath = utils.StringPointer(utils.IfaceAsString(utils.S3FolderPath)) - } - if _, has := opts[utils.NatsJetStream]; has { - x, err := utils.IfaceAsBool(utils.NatsJetStream) - if err != nil { - return nil, err - } - optsCfg.NATSJetStream = utils.BoolPointer(x) - } - if _, has := opts[utils.NatsSubject]; has { - optsCfg.NATSSubject = utils.StringPointer(utils.IfaceAsString(utils.NatsSubject)) - } - if _, has := opts[utils.NatsJWTFile]; has { - optsCfg.NATSJWTFile = utils.StringPointer(utils.IfaceAsString(utils.NatsJWTFile)) - } - if _, has := opts[utils.NatsSeedFile]; has { - optsCfg.NATSSeedFile = utils.StringPointer(utils.IfaceAsString(utils.NatsSeedFile)) - } - if _, has := opts[utils.NatsCertificateAuthority]; has { - optsCfg.NATSCertificateAuthority = utils.StringPointer(utils.IfaceAsString(utils.NatsCertificateAuthority)) - } - if _, has := opts[utils.NatsClientCertificate]; has { - optsCfg.NATSClientCertificate = utils.StringPointer(utils.IfaceAsString(utils.NatsClientCertificate)) - } - if _, has := opts[utils.NatsClientKey]; has { - optsCfg.NATSClientKey = utils.StringPointer(utils.IfaceAsString(utils.NatsClientKey)) - } - if _, has := opts[utils.NatsJetStreamMaxWait]; has { - t, err := utils.IfaceAsDuration(utils.NatsJetStreamMaxWait) - if err != nil { - return nil, err - } - optsCfg.NATSJetStreamMaxWait = &t - } - if _, has := opts[utils.RpcCodec]; has { - optsCfg.RPCCodec = utils.StringPointer(utils.IfaceAsString(utils.RpcCodec)) - } - if _, has := opts[utils.ServiceMethod]; has { - optsCfg.ServiceMethod = utils.StringPointer(utils.IfaceAsString(utils.ServiceMethod)) - } - if _, has := opts[utils.KeyPath]; has { - optsCfg.KeyPath = utils.StringPointer(utils.IfaceAsString(utils.KeyPath)) - } - if _, has := opts[utils.CertPath]; has { - optsCfg.CertPath = utils.StringPointer(utils.IfaceAsString(utils.CertPath)) - } - if _, has := opts[utils.CaPath]; has { - optsCfg.CAPath = utils.StringPointer(utils.IfaceAsString(utils.CaPath)) - } - if _, has := opts[utils.Tls]; has { - x, err := utils.IfaceAsBool(utils.Tls) - if err != nil { - return nil, err - } - optsCfg.TLS = utils.BoolPointer(x) - } - if _, has := opts[utils.ConnIDs]; has { - optsCfg.ConnIDs = opts[utils.ConnIDs].(*[]string) - } - if _, has := opts[utils.RpcConnTimeout]; has { - t, err := utils.IfaceAsDuration(utils.RpcConnTimeout) - if err != nil { - return nil, err - } - optsCfg.RPCConnTimeout = &t - } - if _, has := opts[utils.RpcReplyTimeout]; has { - t, err := utils.IfaceAsDuration(utils.RpcReplyTimeout) - if err != nil { - return nil, err - } - optsCfg.RPCReplyTimeout = &t - } - if _, has := opts[utils.RPCAPIOpts]; has { - optsCfg.RPCAPIOpts = opts[utils.RPCAPIOpts].(map[string]any) - } - return optsCfg, nil -} - -// FailedExportersEEs used to save the failed post to file -type FailedExportersEEs struct { - lk sync.RWMutex - Path string - Opts *config.EventExporterOpts - Format string - Events []any - failedPostsDir string - module string -} - -// AddEvent adds one event -func (expEv *FailedExportersEEs) AddEvent(ev any) { - expEv.lk.Lock() - expEv.Events = append(expEv.Events, ev) - expEv.lk.Unlock() -} - -// ReplayFailedPosts tryies to post cdrs again -func (expEv *FailedExportersEEs) ReplayFailedPosts(attempts int) (err error) { - eesFailedEvents := &FailedExportersEEs{ - Path: expEv.Path, - Opts: expEv.Opts, - Format: expEv.Format, - } - - eeCfg := config.NewEventExporterCfg("ReplayFailedPosts", expEv.Format, expEv.Path, - utils.MetaNone, attempts, expEv.Opts) - var ee EventExporter - if ee, err = NewEventExporter(eeCfg, config.CgrConfig(), nil, nil); err != nil { - return - } - keyFunc := func() string { return utils.EmptyString } - if expEv.Format == utils.MetaKafkajsonMap || expEv.Format == utils.MetaS3jsonMap { - keyFunc = utils.UUIDSha1Prefix - } - for _, ev := range expEv.Events { - if err = ExportWithAttempts(context.Background(), ee, ev, keyFunc()); err != nil { - eesFailedEvents.AddEvent(ev) - } - } - ee.Close() - if len(eesFailedEvents.Events) > 0 { - err = utils.ErrPartiallyExecuted - } else { - eesFailedEvents = nil - } - return -} -*/ diff --git a/ees/libcdre_it_test.go b/ees/libcdre_it_test.go deleted file mode 100644 index ac4f07ee4..000000000 --- a/ees/libcdre_it_test.go +++ /dev/null @@ -1,106 +0,0 @@ -//go:build integration -// +build integration - -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package ees - -/* -func TestWriteFldPosts(t *testing.T) { - // can't convert - var notanExportEvent string - writeFailedPosts("somestring", notanExportEvent) - // can convert & write - dir := "/tmp/engine/libcdre_test/" - exportEvent := &ExportEvents{ - failedPostsDir: dir, - module: "module", - } - if err := os.RemoveAll(dir); err != nil { - t.Fatal("Error removing folder: ", dir, err) - } - if err := os.MkdirAll(dir, 0755); err != nil { - t.Fatal("Error creating folder: ", dir, err) - } - config.CgrConfig().GeneralCfg().FailedPostsDir = dir - writeFailedPosts("itmID", exportEvent) - - if filename, err := filepath.Glob(filepath.Join(dir, "module|*.gob")); err != nil { - t.Error(err) - } else if len(filename) == 0 { - t.Error("Expecting one file") - } else if len(filename) > 1 { - t.Error("Expecting only one file") - } -} - -func TestWriteToFile(t *testing.T) { - filePath := "/tmp/engine/libcdre_test/writeToFile.txt" - exportEvent := &ExportEvents{} - //call WriteToFile function - if err := exportEvent.WriteToFile(filePath); err != nil { - t.Error(err) - } - // check if the file exists / throw error if the file doesn't exist - if _, err := os.Stat(filePath); os.IsNotExist(err) { - t.Fatalf("File doesn't exists") - } - //check if the file was written correctly - rcv, err := NewExportEventsFromFile(filePath) - if err != nil { - t.Errorf("Error deconding the file content: %+v", err) - } - if !reflect.DeepEqual(rcv, exportEvent) { - t.Errorf("Expecting: %+v,\nReceived: %+v", utils.ToJSON(exportEvent), utils.ToJSON(rcv)) - } - //populate the exportEvent struct - exportEvent = &ExportEvents{ - Events: []any{"something1", "something2"}, - Path: "path", - Format: "test", - } - filePath = "/tmp/engine/libcdre_test/writeToFile2.txt" - if err := exportEvent.WriteToFile(filePath); err != nil { - t.Error(err) - } - // check if the file exists / throw error if the file doesn't exist - if _, err := os.Stat(filePath); os.IsNotExist(err) { - t.Fatalf("File doesn't exists") - } - //check if the file was written correctly - rcv, err = NewExportEventsFromFile(filePath) - if err != nil { - t.Errorf("Error deconding the file content: %+v", err) - } - if !reflect.DeepEqual(rcv, exportEvent) { - t.Errorf("Expected: %+v,\nReceived: %+v", utils.ToJSON(exportEvent), utils.ToJSON(rcv)) - } - //wrong path *reading - exportEvent = &ExportEvents{} - filePath = "/tmp/engine/libcdre_test/wrongpath.txt" - if _, err = NewExportEventsFromFile(filePath); err == nil || err.Error() != "open /tmp/engine/libcdre_test/wrongpath.txt: no such file or directory" { - t.Errorf("Expecting: 'open /tmp/engine/libcdre_test/wrongpath.txt: no such file or directory',\nReceived: '%+v'", err) - } - //wrong path *writing - filePath = utils.EmptyString - if err := exportEvent.WriteToFile(filePath); err == nil || err.Error() != "open : no such file or directory" { - t.Error(err) - } -} -*/ diff --git a/ees/libcdre_test.go b/ees/libcdre_test.go deleted file mode 100644 index 6903d3988..000000000 --- a/ees/libcdre_test.go +++ /dev/null @@ -1,156 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package ees - -/* -func TestSetFldPostCacheTTL(t *testing.T) { - var1 := failedPostCache - SetFailedPostCacheTTL(50 * time.Millisecond) - var2 := failedPostCache - if reflect.DeepEqual(var1, var2) { - t.Error("Expecting to be different") - } -} - -func TestAddFldPost(t *testing.T) { - SetFailedPostCacheTTL(5 * time.Second) - AddFailedPost("", "path1", "format1", "module1", "1", &config.EventExporterOpts{}) - x, ok := failedPostCache.Get(utils.ConcatenatedKey("", "path1", "format1", "module1")) - if !ok { - t.Error("Error reading from cache") - } - if x == nil { - t.Error("Received an empty element") - } - - failedPost, canCast := x.(*ExportEvents) - if !canCast { - t.Error("Error when casting") - } - eOut := &ExportEvents{ - Path: "path1", - Format: "format1", - module: "module1", - Events: []any{"1"}, - Opts: &config.EventExporterOpts{}, - } - if !reflect.DeepEqual(eOut, failedPost) { - t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) - } - AddFailedPost("", "path1", "format1", "module1", "2", &config.EventExporterOpts{}) - AddFailedPost("", "path2", "format2", "module2", "3", &config.EventExporterOpts{ - SQSQueueID: utils.StringPointer("qID"), - }) - x, ok = failedPostCache.Get(utils.ConcatenatedKey("", "path1", "format1", "module1")) - if !ok { - t.Error("Error reading from cache") - } - if x == nil { - t.Error("Received an empty element") - } - failedPost, canCast = x.(*ExportEvents) - if !canCast { - t.Error("Error when casting") - } - eOut = &ExportEvents{ - Path: "path1", - Format: "format1", - module: "module1", - Events: []any{"1", "2"}, - Opts: &config.EventExporterOpts{}, - } - if !reflect.DeepEqual(eOut, failedPost) { - t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) - } - x, ok = failedPostCache.Get(utils.ConcatenatedKey("", "path2", "format2", "module2", "qID")) - if !ok { - t.Error("Error reading from cache") - } - if x == nil { - t.Error("Received an empty element") - } - failedPost, canCast = x.(*ExportEvents) - if !canCast { - t.Error("Error when casting") - } - eOut = &ExportEvents{ - Path: "path2", - Format: "format2", - module: "module2", - Events: []any{"3"}, - Opts: &config.EventExporterOpts{ - SQSQueueID: utils.StringPointer("qID"), - }, - } - if !reflect.DeepEqual(eOut, failedPost) { - t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) - } - for _, id := range failedPostCache.GetItemIDs("") { - failedPostCache.Set(id, nil, nil) - } -} - -func TestFilePath(t *testing.T) { - exportEvent := &ExportEvents{} - rcv := exportEvent.FilePath() - if rcv[0] != '|' { - t.Errorf("Expecting: '|', received: %+v", rcv[0]) - } else if rcv[8:] != ".gob" { - t.Errorf("Expecting: '.gob', received: %+v", rcv[8:]) - } - exportEvent = &ExportEvents{ - module: "module", - } - rcv = exportEvent.FilePath() - if rcv[:7] != "module|" { - t.Errorf("Expecting: 'module|', received: %+v", rcv[:7]) - } else if rcv[14:] != ".gob" { - t.Errorf("Expecting: '.gob', received: %+v", rcv[14:]) - } - -} - -func TestSetModule(t *testing.T) { - exportEvent := &ExportEvents{} - eOut := &ExportEvents{ - module: "module", - } - exportEvent.SetModule("module") - if !reflect.DeepEqual(eOut, exportEvent) { - t.Errorf("Expecting: %+v, received: %+v", eOut, exportEvent) - } -} - -func TestAddEvent(t *testing.T) { - exportEvent := &ExportEvents{} - eOut := &ExportEvents{Events: []any{"event1"}} - exportEvent.AddEvent("event1") - if !reflect.DeepEqual(eOut, exportEvent) { - t.Errorf("Expecting: %+v, received: %+v", eOut, exportEvent) - } - exportEvent = &ExportEvents{} - eOut = &ExportEvents{Events: []any{"event1", "event2", "event3"}} - exportEvent.AddEvent("event1") - exportEvent.AddEvent("event2") - exportEvent.AddEvent("event3") - if !reflect.DeepEqual(eOut, exportEvent) { - t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(exportEvent)) - } -} -*/ diff --git a/efs/efs.go b/efs/efs.go index bc9faf283..d066c1fff 100644 --- a/efs/efs.go +++ b/efs/efs.go @@ -19,8 +19,10 @@ along with this program. If not, see package efs import ( - "os" - "path" + "fmt" + "io/fs" + "path/filepath" + "slices" "strings" "sync" @@ -76,14 +78,14 @@ func (efs *EfS) V1ProcessEvent(ctx *context.Context, args *utils.ArgsFailedPosts } case utils.Kafka: } - var failedPost *FailedExportersLogg + var failedPost *FailedExportersLog if x, ok := failedPostCache.Get(key); ok { if x != nil { - failedPost = x.(*FailedExportersLogg) + failedPost = x.(*FailedExportersLog) } } if failedPost == nil { - failedPost = &FailedExportersLogg{ + failedPost = &FailedExportersLog{ Path: args.Path, Format: format, Opts: args.APIOpts, @@ -97,56 +99,64 @@ func (efs *EfS) V1ProcessEvent(ctx *context.Context, args *utils.ArgsFailedPosts return nil } +// ReplayEventsParams contains parameters for replaying failed posts. +type ReplayEventsParams struct { + Tenant string + Provider string // source of failed posts + SourcePath string // path for events to be replayed + FailedPath string // path for events that failed to replay, *none to discard, defaults to SourceDir if empty + Modules []string // list of modules to replay requests for, nil for all +} + // V1ReplayEvents will read the Events from gob files that were failed to be exported and try to re-export them again. -func (efS *EfS) V1ReplayEvents(ctx *context.Context, args *utils.ArgsReplayFailedPosts, reply *string) error { - failedPostsDir := efS.cfg.EFsCfg().FailedPostsDir - if args.FailedRequestsInDir != nil && *args.FailedRequestsInDir != utils.EmptyString { - failedPostsDir = *args.FailedRequestsInDir +func (efS *EfS) V1ReplayEvents(ctx *context.Context, args ReplayEventsParams, reply *string) error { + + // Set default directories if not provided. + if args.SourcePath == "" { + args.SourcePath = efS.cfg.EFsCfg().FailedPostsDir } - failedOutDir := failedPostsDir - if args.FailedRequestsOutDir != nil && *args.FailedRequestsOutDir != utils.EmptyString { - failedOutDir = *args.FailedRequestsOutDir + if args.FailedPath == "" { + args.FailedPath = args.SourcePath } - // check all the files in the FailedPostsInDirectory - filesInDir, err := os.ReadDir(failedPostsDir) - if err != nil { - return err - } - if len(filesInDir) == 0 { - return utils.ErrNotFound - } - // check every file and check if any of them match the modules - for _, file := range filesInDir { - if len(args.Modules) != 0 { - var allowedModule bool - for _, module := range args.Modules { - if strings.HasPrefix(file.Name(), module) { - allowedModule = true - break - } - } - if !allowedModule { - continue - } + + if err := filepath.WalkDir(args.SourcePath, func(path string, d fs.DirEntry, err error) error { + if err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> failed to access path %s: %v", utils.EFs, path, err)) + return nil // skip paths that cause an error } - filePath := path.Join(failedPostsDir, file.Name()) - var expEv FailoverPoster - if expEv, err = NewFailoverPosterFromFile(filePath, args.TypeProvider, efS); err != nil { - return err + if d.IsDir() { + return nil // skip directories } - // check if the failed out dir path is the same as the same in dir in order to export again in case of failure + + // Skip files not belonging to the specified modules. + if len(args.Modules) != 0 && !slices.ContainsFunc(args.Modules, func(mod string) bool { + return strings.HasPrefix(d.Name(), mod) + }) { + utils.Logger.Info(fmt.Sprintf("<%s> skipping file %s: not found within specified modules", utils.EFs, d.Name())) + return nil + } + + expEv, err := NewFailoverPosterFromFile(path, args.Provider, efS) + if err != nil { + return fmt.Errorf("failed to init failover poster from %s: %v", path, err) + } + + // Determine the failover path. failoverPath := utils.MetaNone - if failedOutDir != utils.MetaNone { - failoverPath = path.Join(failedOutDir, file.Name()) + if args.FailedPath != utils.MetaNone { + failoverPath = filepath.Join(args.FailedPath, d.Name()) } err = expEv.ReplayFailedPosts(ctx, efS.cfg.EFsCfg().PosterAttempts, args.Tenant) - if err != nil && failedOutDir != utils.MetaNone { // Got error from HTTPPoster could be that content was not written, we need to write it ourselves + if err != nil && failoverPath != utils.MetaNone { + // Write the events that failed to be replayed to the failover directory. if err = WriteToFile(failoverPath, expEv); err != nil { - return utils.NewErrServerError(err) + return fmt.Errorf("failed to write the events that failed to be replayed to %s: %v", path, err) } } - + return nil + }); err != nil { + return utils.NewErrServerError(err) } *reply = utils.OK return nil diff --git a/efs/efs_it_test.go b/efs/efs_it_test.go index d0851c5e7..c37d7c639 100644 --- a/efs/efs_it_test.go +++ b/efs/efs_it_test.go @@ -25,6 +25,7 @@ import ( "encoding/gob" "fmt" "os" + "os/exec" "path" "testing" "time" @@ -35,6 +36,9 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/nats-io/nats.go" ) var ( @@ -70,7 +74,7 @@ func TestDecodeExportEvents(t *testing.T) { } dec := gob.NewDecoder(bytes.NewBuffer(content)) gob.Register(new(utils.CGREvent)) - singleEvent := new(FailedExportersLogg) + singleEvent := new(FailedExportersLog) if err := dec.Decode(&singleEvent); err != nil { t.Error(err) } else { @@ -183,3 +187,184 @@ func testEfsSKillEngine(t *testing.T) { t.Error(err) } } + +// TestEFsReplayEvents tests the implementation of the EfSv1.ReplayEvents. +func TestEFsReplayEvents(t *testing.T) { + switch *utils.DBType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + failedDir := t.TempDir() + content := fmt.Sprintf(`{ +"data_db": { + "db_type": "*internal" +}, +"stor_db": { + "db_type": "*internal" +}, +"admins": { + "enabled": true +}, +"efs": { + "enabled": true, + "failed_posts_ttl": "3ms", + "poster_attempts": 1 +}, +"ees": { + "enabled": true, + "exporters": [ + { + "id": "nats_exporter", + "type": "*natsJSONMap", + "flags": ["*log"], + "efs_conns": ["*localhost"], + "export_path": "nats://localhost:4222", + "attempts": 1, + "failed_posts_dir": "%s", + "synchronous": true, + "opts": { + "natsSubject": "processed_cdrs", + }, + "fields":[ + {"tag": "TestField", "path": "*exp.TestField", "type": "*variable", "value": "~*req.TestField"}, + ] + } + ] +} +}`, failedDir) + + testEnv := engine.TestEnvironment{ + ConfigJSON: content, + // LogBuffer: &bytes.Buffer{}, + Encoding: *utils.Encoding, + } + // defer fmt.Println(testEnv.LogBuffer) + client, _ := testEnv.Setup(t, context.Background()) + // helper to sort slices + less := func(a, b string) bool { return a < b } + // amount of events to export/replay + count := 5 + t.Run("successful nats export", func(t *testing.T) { + cmd := exec.Command("nats-server") + if err := cmd.Start(); err != nil { + t.Fatalf("failed to start nats-server: %v", err) + } + time.Sleep(50 * time.Millisecond) + defer cmd.Process.Kill() + nc, err := nats.Connect("nats://localhost:4222", nats.Timeout(time.Second), nats.DrainTimeout(time.Second)) + if err != nil { + t.Fatalf("failed to connect to nats-server: %v", err) + } + defer nc.Drain() + ch := make(chan *nats.Msg, count) + sub, err := nc.ChanQueueSubscribe("processed_cdrs", "", ch) + if err != nil { + t.Fatalf("failed to subscribe to nats queue: %v", err) + } + var reply map[string]map[string]any + for i := range count { + if err := client.Call(context.Background(), utils.EeSv1ProcessEvent, &utils.CGREventWithEeIDs{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "test", + Event: map[string]any{ + "TestField": i, + }, + }, + }, &reply); err != nil { + t.Errorf("EeSv1.ProcessEvent returned unexpected err: %v", err) + } + } + time.Sleep(1 * time.Millisecond) // wait for the channel to receive the replayed exports + want := make([]string, 0, count) + for i := range count { + want = append(want, fmt.Sprintf(`{"TestField":"%d"}`, i)) + } + if err := sub.Unsubscribe(); err != nil { + t.Errorf("failed to unsubscribe from nats subject: %v", err) + } + close(ch) + got := make([]string, 0, count) + for elem := range ch { + got = append(got, string(elem.Data)) + } + if diff := cmp.Diff(want, got, cmpopts.SortSlices(less)); diff != "" { + t.Errorf("unexpected nats messages received over channel (-want +got): \n%s", diff) + } + }) + t.Run("replay failed nats export", func(t *testing.T) { + t.Skip("skipping due to gob decoding err") + var exportReply map[string]map[string]any + for i := range count { + err := client.Call(context.Background(), utils.EeSv1ProcessEvent, + &utils.CGREventWithEeIDs{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "test", + Event: map[string]any{ + "TestField": i, + }, + }, + }, &exportReply) + if err == nil || err.Error() != utils.ErrPartiallyExecuted.Error() { + t.Errorf("EeSv1.ProcessEvent err = %v, want %v", err, utils.ErrPartiallyExecuted) + } + } + time.Sleep(5 * time.Millisecond) + replayFailedDir := t.TempDir() + var replayReply string + if err := client.Call(context.Background(), utils.EfSv1ReplayEvents, ReplayEventsParams{ + Tenant: "cgrates.org", + Provider: utils.EEs, + SourcePath: failedDir, + FailedPath: replayFailedDir, + Modules: []string{"test", "EEs"}, + }, &replayReply); err != nil { + t.Errorf("EfSv1.ReplayEvents returned unexpected err: %v", err) + } + cmd := exec.Command("nats-server") + if err := cmd.Start(); err != nil { + t.Fatalf("failed to start nats-server: %v", err) + } + time.Sleep(50 * time.Millisecond) + defer cmd.Process.Kill() + nc, err := nats.Connect("nats://localhost:4222", nats.Timeout(time.Second), nats.DrainTimeout(time.Second)) + if err != nil { + t.Fatalf("failed to connect to nats-server: %v", err) + } + defer nc.Drain() + ch := make(chan *nats.Msg, count) + sub, err := nc.ChanQueueSubscribe("processed_cdrs", "", ch) + if err != nil { + t.Fatalf("failed to subscribe to nats queue: %v", err) + } + if err := client.Call(context.Background(), utils.EfSv1ReplayEvents, ReplayEventsParams{ + Tenant: "cgrates.org", + Provider: utils.EEs, + SourcePath: replayFailedDir, + FailedPath: utils.MetaNone, + Modules: []string{"test", "EEs"}, + }, &replayReply); err != nil { + t.Errorf("EfSv1.ReplayEvents returned unexpected err: %v", err) + } + time.Sleep(time.Millisecond) // wait for the channel to receive the replayed exports + want := make([]string, 0, count) + for i := range count { + want = append(want, fmt.Sprintf(`{"TestField":"%d"}`, i)) + } + if err := sub.Unsubscribe(); err != nil { + t.Errorf("failed to unsubscribe from nats subject: %v", err) + } + close(ch) + got := make([]string, 0, count) + for elem := range ch { + got = append(got, string(elem.Data)) + } + if diff := cmp.Diff(want, got, cmpopts.SortSlices(less)); diff != "" { + t.Errorf("unexpected nats messages received over channel (-want +got): \n%s", diff) + } + }) +} diff --git a/efs/failed_ees.go b/efs/failed_ees.go index db934abda..80fe050a4 100644 --- a/efs/failed_ees.go +++ b/efs/failed_ees.go @@ -250,7 +250,7 @@ func (expEv *FailedExportersEEs) AddEvent(ev any) { // ReplayFailedPosts tryies to post cdrs again func (expEv *FailedExportersEEs) ReplayFailedPosts(ctx *context.Context, attempts int, tnt string) (err error) { - eesFailedEvents := &FailedExportersEEs{ + failedEvents := &FailedExportersEEs{ Path: expEv.Path, Opts: expEv.Opts, Format: expEv.Format, @@ -268,14 +268,16 @@ func (expEv *FailedExportersEEs) ReplayFailedPosts(ctx *context.Context, attempt } for _, ev := range expEv.Events { if err = ees.ExportWithAttempts(context.Background(), ee, ev, keyFunc(), expEv.connMngr, tnt); err != nil { - eesFailedEvents.AddEvent(ev) + failedEvents.AddEvent(ev) } } ee.Close() - if len(eesFailedEvents.Events) > 0 { - err = utils.ErrPartiallyExecuted - } else { - eesFailedEvents = nil + switch len(failedEvents.Events) { + case 0: // none failed to be replayed + return nil + case len(expEv.Events): // all failed, return last encountered error + return err + default: + return utils.ErrPartiallyExecuted } - return } diff --git a/efs/failed_loggs.go b/efs/failed_logs.go similarity index 68% rename from efs/failed_loggs.go rename to efs/failed_logs.go index 2074329dd..938bb2337 100644 --- a/efs/failed_loggs.go +++ b/efs/failed_logs.go @@ -31,8 +31,8 @@ import ( "github.com/segmentio/kafka-go" ) -// FailedExportersLogg is a failover poster for kafka logger type -type FailedExportersLogg struct { +// FailedExportersLog is a failover poster for kafka logger type +type FailedExportersLog struct { lk sync.RWMutex Path string Opts map[string]any // this is meta @@ -46,50 +46,51 @@ type FailedExportersLogg struct { } // AddEvent adds one event -func (expEv *FailedExportersLogg) AddEvent(ev any) { +func (expEv *FailedExportersLog) AddEvent(ev any) { expEv.lk.Lock() + defer expEv.lk.Unlock() expEv.Events = append(expEv.Events, ev) - expEv.lk.Unlock() } // NewExportEventsFromFile returns ExportEvents from the file // used only on replay failed post -func NewExportEventsFromFile(filePath string) (expEv *FailedExportersLogg, err error) { - var fileContent []byte - if fileContent, err = os.ReadFile(filePath); err != nil { +func NewExportEventsFromFile(filePath string) (*FailedExportersLog, error) { + content, err := os.ReadFile(filePath) + if err != nil { return nil, err } - if err = os.Remove(filePath); err != nil { + if err := os.Remove(filePath); err != nil { return nil, err } - dec := gob.NewDecoder(bytes.NewBuffer(fileContent)) - // unmarshall it - expEv = new(FailedExportersLogg) - err = dec.Decode(&expEv) - return + var expEv FailedExportersLog + dec := gob.NewDecoder(bytes.NewBuffer(content)) + if err := dec.Decode(&expEv); err != nil { + return nil, err + } + return &expEv, nil } // ReplayFailedPosts tryies to post cdrs again -func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attempts int, tnt string) (err error) { +func (expEv *FailedExportersLog) ReplayFailedPosts(ctx *context.Context, attempts int, tnt string) error { nodeID := utils.IfaceAsString(expEv.Opts[utils.NodeID]) logLvl, err := utils.IfaceAsInt(expEv.Opts[utils.Level]) if err != nil { - return + return err } expLogger := engine.NewExportLogger(ctx, nodeID, tnt, logLvl, expEv.connMngr, expEv.cfg) for _, event := range expEv.Events { - var content []byte - if content, err = utils.ToUnescapedJSON(event); err != nil { - return + content, err := utils.ToUnescapedJSON(event) + if err != nil { + return err } - if err = expLogger.Writer.WriteMessages(context.Background(), kafka.Message{ + if err := expLogger.Writer.WriteMessages(context.Background(), kafka.Message{ Key: []byte(utils.GenUUID()), Value: content, }); err != nil { var reply string // if there are any errors in kafka, we will post in FailedPostDirectory - if err = expEv.connMngr.Call(ctx, expEv.cfg.LoggerCfg().EFsConns, utils.EfSv1ProcessEvent, + return expEv.connMngr.Call(ctx, expEv.cfg.LoggerCfg().EFsConns, utils.EfSv1ProcessEvent, &utils.ArgsFailedPosts{ Tenant: tnt, Path: expLogger.Writer.Addr.String(), @@ -97,11 +98,8 @@ func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attemp FailedDir: expLogger.FldPostDir, Module: utils.Kafka, APIOpts: expLogger.GetMeta(), - }, &reply); err != nil { - return err - } - return nil + }, &reply) } } - return err + return nil } diff --git a/efs/libefs.go b/efs/libefs.go index 29095dadf..2b0125cd9 100644 --- a/efs/libefs.go +++ b/efs/libefs.go @@ -21,6 +21,7 @@ package efs import ( "bytes" "encoding/gob" + "errors" "fmt" "os" "path" @@ -45,23 +46,22 @@ func SetFailedPostCacheTTL(ttl time.Duration) { } func writeFailedPosts(_ string, value any) { - expEv, canConvert := value.(*FailedExportersLogg) + expEv, canConvert := value.(*FailedExportersLog) if !canConvert { return } filePath := expEv.FilePath() expEv.lk.RLock() + defer expEv.lk.RUnlock() if err := WriteToFile(filePath, expEv); err != nil { utils.Logger.Warning(fmt.Sprintf("Unable to write failed post to file <%s> because <%s>", filePath, err)) - expEv.lk.RUnlock() return } - expEv.lk.RUnlock() } // FilePath returns the file path it should use for saving the failed events -func (expEv *FailedExportersLogg) FilePath() string { +func (expEv *FailedExportersLog) FilePath() string { return path.Join(expEv.FailedPostsDir, expEv.Module+utils.PipeSep+utils.UUIDSha1Prefix()+utils.GOBSuffix) } @@ -84,28 +84,32 @@ func WriteToFile(filePath string, expEv FailoverPoster) (err error) { // NewFailoverPosterFromFile returns ExportEvents from the file // used only on replay failed post -func NewFailoverPosterFromFile(filePath, providerType string, efs *EfS) (failPoster FailoverPoster, err error) { - var fileContent []byte - err = guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) error { - if fileContent, err = os.ReadFile(filePath); err != nil { - return err +func NewFailoverPosterFromFile(filePath, provider string, efs *EfS) (FailoverPoster, error) { + var content []byte + err := guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) error { + var readErr error + if content, readErr = os.ReadFile(filePath); readErr != nil { + return readErr } return os.Remove(filePath) }, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath) if err != nil { - return + return nil, err } - dec := gob.NewDecoder(bytes.NewBuffer(fileContent)) - // unmarshall it - expEv := new(FailedExportersLogg) - err = dec.Decode(&expEv) - switch providerType { + + dec := gob.NewDecoder(bytes.NewBuffer(content)) + var expEv FailedExportersLog + if err := dec.Decode(&expEv); err != nil { + return nil, err + } + + switch provider { case utils.EEs: opts, err := AsOptsEESConfig(expEv.Opts) if err != nil { return nil, err } - failPoster = &FailedExportersEEs{ + return &FailedExportersEEs{ module: expEv.Module, failedPostsDir: expEv.FailedPostsDir, Path: expEv.Path, @@ -114,11 +118,12 @@ func NewFailoverPosterFromFile(filePath, providerType string, efs *EfS) (failPos Format: expEv.Format, connMngr: efs.connMgr, - } + }, nil case utils.Kafka: expEv.cfg = efs.cfg expEv.connMngr = efs.connMgr - failPoster = expEv + return &expEv, nil + default: + return nil, errors.New("invalid provider") } - return } diff --git a/go.mod b/go.mod index cc006fd44..f810eafd8 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/fiorix/go-diameter/v4 v4.0.4 github.com/fsnotify/fsnotify v1.7.0 github.com/go-sql-driver/mysql v1.8.1 + github.com/google/go-cmp v0.6.0 github.com/mediocregopher/radix/v3 v3.8.1 github.com/miekg/dns v1.1.62 github.com/nats-io/nats-server/v2 v2.10.18 diff --git a/utils/coreutils.go b/utils/coreutils.go index c0e1e7f57..a775dd92a 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -823,14 +823,6 @@ type ArgsFailedPosts struct { APIOpts map[string]any // Specially for the meta } -type ArgsReplayFailedPosts struct { - Tenant string - TypeProvider string - FailedRequestsInDir *string // if defined it will be our source of requests to be replayed - FailedRequestsOutDir *string // if defined it will become our destination for files failing to be replayed, *none to be discarded - Modules []string // list of modules for which replay the requests, nil for all -} - // GetIndexesArg the API argumets to specify an index type GetIndexesArg struct { IdxItmType string diff --git a/utils/failover_export.go b/utils/failover_export.go deleted file mode 100644 index b64ee3378..000000000 --- a/utils/failover_export.go +++ /dev/null @@ -1,178 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package utils - -/* -var failedPostCache *ltcache.Cache - -func init() { - failedPostCache = ltcache.NewCache(-1, 5*time.Second, true, writeFailedPosts) -} - -// SetFailedPostCacheTTL recreates the failed cache -func SetFailedPostCacheTTL(ttl time.Duration) { - failedPostCache = ltcache.NewCache(-1, ttl, true, writeFailedPosts) -} - -func writeFailedPosts(_ string, value any) { - expEv, canConvert := value.(*FailedExportersLogg) - if !canConvert { - return - } - filePath := expEv.FilePath() - expEv.lk.RLock() - if err := WriteToFile(filePath, expEv); err != nil { - Logger.Warning(fmt.Sprintf("Unable to write failed post to file <%s> because <%s>", - filePath, err)) - expEv.lk.RUnlock() - return - } - expEv.lk.RUnlock() -} - -// FilePath returns the file path it should use for saving the failed events -func (expEv *FailedExportersLogg) FilePath() string { - return path.Join(expEv.FailedPostsDir, expEv.Module+PipeSep+UUIDSha1Prefix()+GOBSuffix) -} - -// WriteToFile writes the events to file -func WriteToFile(filePath string, expEv FailoverPoster) (err error) { - fileOut, err := os.Create(filePath) - if err != nil { - return err - } - encd := gob.NewEncoder(fileOut) - gob.Register(new(CGREvent)) - err = encd.Encode(expEv) - fileOut.Close() - return -} - -type FailedExportersLogg struct { - lk sync.RWMutex - Path string - Opts map[string]any // THIS WILL BE META - Format string - Events []any - FailedPostsDir string - Module string -} - -func AddFailedMessage(failedPostsDir, expPath, format, - module string, ev any, opts map[string]any) { - key := ConcatenatedKey(failedPostsDir, expPath, format, module) - switch module { - case EEs: - // also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id - var amqpQueueID string - var s3BucketID string - var sqsQueueID string - var kafkaTopic string - if _, has := opts[AMQPQueueID]; has { - amqpQueueID = IfaceAsString(opts[AMQPQueueID]) - } - if _, has := opts[S3Bucket]; has { - s3BucketID = IfaceAsString(opts[S3Bucket]) - } - if _, has := opts[SQSQueueID]; has { - sqsQueueID = IfaceAsString(opts[SQSQueueID]) - } - if _, has := opts[kafkaTopic]; has { - kafkaTopic = IfaceAsString(opts[KafkaTopic]) - } - if qID := FirstNonEmpty(amqpQueueID, s3BucketID, - sqsQueueID, kafkaTopic); len(qID) != 0 { - key = ConcatenatedKey(key, qID) - } - case Kafka: - } - var failedPost *FailedExportersLogg - if x, ok := failedPostCache.Get(key); ok { - if x != nil { - failedPost = x.(*FailedExportersLogg) - } - } - if failedPost == nil { - failedPost = &FailedExportersLogg{ - Path: expPath, - Format: format, - Opts: opts, - Module: module, - FailedPostsDir: failedPostsDir, - } - failedPostCache.Set(key, failedPost, nil) - } - failedPost.AddEvent(ev) -} - -// AddEvent adds one event -func (expEv *FailedExportersLogg) AddEvent(ev any) { - expEv.lk.Lock() - expEv.Events = append(expEv.Events, ev) - expEv.lk.Unlock() -} - -// NewExportEventsFromFile returns ExportEvents from the file -// used only on replay failed post -func NewExportEventsFromFile(filePath string) (expEv *FailedExportersLogg, err error) { - var fileContent []byte - if fileContent, err = os.ReadFile(filePath); err != nil { - return nil, err - } - if err = os.Remove(filePath); err != nil { - return nil, err - } - dec := gob.NewDecoder(bytes.NewBuffer(fileContent)) - // unmarshall it - expEv = new(FailedExportersLogg) - err = dec.Decode(&expEv) - return -} - -type FailoverPoster interface { - ReplayFailedPosts(int, string) error -} - -// ReplayFailedPosts tryies to post cdrs again -func (expEv *FailedExportersLogg) ReplayFailedPosts(attempts int, tnt string) (err error) { - nodeID := IfaceAsString(expEv.Opts[NodeID]) - logLvl, err := IfaceAsInt(expEv.Opts[Level]) - if err != nil { - return - } - expLogger := NewExportLogger(nodeID, tnt, logLvl, - expEv.Path, expEv.Format, attempts, expEv.FailedPostsDir) - for _, event := range expEv.Events { - var content []byte - if content, err = ToUnescapedJSON(event); err != nil { - return - } - if err = expLogger.Writer.WriteMessages(context.Background(), kafka.Message{ - Key: []byte(GenUUID()), - Value: content, - }); err != nil { - // if there are any errors in kafka, we will post in FailedPostDirectory - AddFailedMessage(expLogger.FldPostDir, expLogger.Writer.Addr.String(), MetaKafkaLog, Kafka, - event, expLogger.GetMeta()) - return nil - } - } - return err -} -*/