Implement rpc exporter

This commit is contained in:
ionutboangiu
2022-01-19 15:37:08 +02:00
committed by Dan Christian Bogos
parent f204d474bd
commit 14dfda53be
13 changed files with 291 additions and 16 deletions

View File

@@ -330,7 +330,7 @@ var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV,
var possibleExporterTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.MetaNone, utils.MetaFileFWV,
utils.MetaHTTPPost, utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap,
utils.MetaKafkajsonMap, utils.MetaS3jsonMap, utils.MetaElastic, utils.MetaVirt, utils.MetaSQL, utils.MetaNatsjsonMap,
utils.MetaLog})
utils.MetaLog, utils.MetaRPC})
// LazySanityCheck used after check config sanity to display warnings related to the config
func (cfg *CGRConfig) LazySanityCheck() {

View File

@@ -551,6 +551,17 @@ const CGRATES_CFG_JSON = `
// "natsClientCertificate": "", // the path to a client certificate( used by tls)
// "natsClientKey": "", // the path to a client key( used by tls)
// "natsJetStreamMaxWait": "5s", // the maximum amount of time to wait for a response
//RPC
// "rpcCodec": "", // for compression, encoding and decoding <internalRPC | BIRPC | JSON/HTTP/GOB>
// "serviceMethod": "", // the method that should be called trough RPC
// "keyPath": "" , // path to server key
// "certPath": "", // path to client certificate
// "caPath": "", // path to CA certificate
// "tls": false, //
// "connIDs": [], // connections for connManager to this exporter
// "rpcConnTimeout" : "1s", // connection unsuccesfull on timeout
// "rpcReplyTimeout":"2s", // connection down at replies if taking longer that this value
}, // extra options for exporter
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
"filters": [], // limit parsing based on the filters

View File

@@ -192,6 +192,15 @@ type EventExporterOpts struct {
NATSClientCertificate *string
NATSClientKey *string
NATSJetStreamMaxWait *time.Duration
RPCCodec *string
ServiceMethod *string
KeyPath *string
CertPath *string
CAPath *string
TLS *bool
ConnIDs *[]string
RPCConnTimeout *time.Duration
RPCReplyTimeout *time.Duration
}
// EventExporterCfg the config for a Event Exporter
@@ -346,6 +355,41 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson)
}
eeOpts.NATSJetStreamMaxWait = utils.DurationPointer(natsJetStreamMaxWait)
}
if jsnCfg.RPCCodec != nil {
eeOpts.RPCCodec = jsnCfg.RPCCodec
}
if jsnCfg.ServiceMethod != nil {
eeOpts.ServiceMethod = jsnCfg.ServiceMethod
}
if jsnCfg.KeyPath != nil {
eeOpts.KeyPath = jsnCfg.KeyPath
}
if jsnCfg.CertPath != nil {
eeOpts.CertPath = jsnCfg.CertPath
}
if jsnCfg.CAPath != nil {
eeOpts.CAPath = jsnCfg.CAPath
}
if jsnCfg.TLS != nil {
eeOpts.TLS = jsnCfg.TLS
}
if jsnCfg.ConnIDs != nil {
eeOpts.ConnIDs = jsnCfg.ConnIDs
}
if jsnCfg.RPCConnTimeout != nil {
var rpcConnTimeout time.Duration
if rpcConnTimeout, err = utils.ParseDurationWithNanosecs(*jsnCfg.RPCConnTimeout); err != nil {
return
}
eeOpts.RPCConnTimeout = utils.DurationPointer(rpcConnTimeout)
}
if jsnCfg.RPCReplyTimeout != nil {
var rpcReplyTimeout time.Duration
if rpcReplyTimeout, err = utils.ParseDurationWithNanosecs(*jsnCfg.RPCReplyTimeout); err != nil {
return
}
eeOpts.RPCReplyTimeout = utils.DurationPointer(rpcReplyTimeout)
}
return
}
@@ -563,6 +607,33 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts {
if eeOpts.NATSJetStreamMaxWait != nil {
cln.NATSJetStreamMaxWait = utils.DurationPointer(*eeOpts.NATSJetStreamMaxWait)
}
if eeOpts.RPCCodec != nil {
cln.RPCCodec = utils.StringPointer(*eeOpts.RPCCodec)
}
if eeOpts.ServiceMethod != nil {
cln.ServiceMethod = utils.StringPointer(*eeOpts.ServiceMethod)
}
if eeOpts.KeyPath != nil {
cln.KeyPath = utils.StringPointer(*eeOpts.KeyPath)
}
if eeOpts.CertPath != nil {
cln.CertPath = utils.StringPointer(*eeOpts.CertPath)
}
if eeOpts.CAPath != nil {
cln.CAPath = utils.StringPointer(*eeOpts.CAPath)
}
if eeOpts.TLS != nil {
cln.TLS = utils.BoolPointer(*eeOpts.TLS)
}
if eeOpts.ConnIDs != nil {
cln.ConnIDs = utils.SliceStringPointer(*eeOpts.ConnIDs)
}
if eeOpts.RPCConnTimeout != nil {
cln.RPCConnTimeout = utils.DurationPointer(*eeOpts.RPCConnTimeout)
}
if eeOpts.RPCReplyTimeout != nil {
cln.RPCReplyTimeout = utils.DurationPointer(*eeOpts.RPCReplyTimeout)
}
return cln
}
@@ -731,6 +802,33 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str
if eeC.Opts.NATSJetStreamMaxWait != nil {
opts[utils.NatsJetStreamMaxWait] = eeC.Opts.NATSJetStreamMaxWait.String()
}
if eeC.Opts.RPCCodec != nil {
opts[utils.RpcCodec] = *eeC.Opts.RPCCodec
}
if eeC.Opts.ServiceMethod != nil {
opts[utils.ServiceMethod] = *eeC.Opts.ServiceMethod
}
if eeC.Opts.KeyPath != nil {
opts[utils.KeyPath] = *eeC.Opts.KeyPath
}
if eeC.Opts.CertPath != nil {
opts[utils.CertPath] = *eeC.Opts.CertPath
}
if eeC.Opts.CAPath != nil {
opts[utils.CaPath] = *eeC.Opts.CAPath
}
if eeC.Opts.TLS != nil {
opts[utils.Tls] = *eeC.Opts.TLS
}
if eeC.Opts.ConnIDs != nil {
opts[utils.ConnIDs] = *eeC.Opts.ConnIDs
}
if eeC.Opts.RPCConnTimeout != nil {
opts[utils.RpcConnTimeout] = eeC.Opts.RPCConnTimeout.String()
}
if eeC.Opts.RPCReplyTimeout != nil {
opts[utils.RpcReplyTimeout] = eeC.Opts.RPCReplyTimeout.String()
}
flgs := eeC.Flags.SliceFlags()
if flgs == nil {

View File

@@ -308,6 +308,15 @@ type EventExporterOptsJson struct {
NATSClientCertificate *string `json:"natsClientCertificate"`
NATSClientKey *string `json:"natsClientKey"`
NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"`
RPCCodec *string `json:"rpcCodec"`
ServiceMethod *string `json:"serviceMethod"`
KeyPath *string `json:"keyPath"`
CertPath *string `json:"certPath"`
CAPath *string `json:"caPath"`
ConnIDs *[]string `json:"connIDs"`
TLS *bool `json:"tls"`
RPCConnTimeout *string `json:"rpcConnTimeout"`
RPCReplyTimeout *string `json:"rpcReplyTimeout"`
}
// EventExporterJsonCfg is the configuration of a single EventExporter

View File

@@ -40,7 +40,8 @@ type EventExporter interface {
}
// NewEventExporter produces exporters
func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS) (ee EventExporter, err error) {
func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS,
connMngr *engine.ConnManager) (ee EventExporter, err error) {
var dc *utils.SafeMapStorage
if dc, err = newEEMetrics(utils.FirstNonEmpty(
cfg.Timezone,
@@ -77,6 +78,8 @@ func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, fi
return NewSQLEe(cfg, dc)
case utils.MetaLog:
return NewLogEE(cfg, dc), nil
case utils.MetaRPC:
return NewRpcEE(cfg, dc, connMngr)
default:
return nil, fmt.Errorf("unsupported exporter type: <%s>", cfg.Type)
}

View File

@@ -34,7 +34,7 @@ func TestNewEventExporter(t *testing.T) {
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaFileCSV
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil)
errExpect := "open /var/spool/cgrates/ees/*default_"
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
@@ -67,7 +67,7 @@ func TestNewEventExporterCase2(t *testing.T) {
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaFileFWV
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil)
errExpect := "open /var/spool/cgrates/ees/*default_"
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
@@ -97,7 +97,7 @@ func TestNewEventExporterCase3(t *testing.T) {
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaHTTPPost
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil)
if err != nil {
t.Error(err)
}
@@ -122,7 +122,7 @@ func TestNewEventExporterCase4(t *testing.T) {
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaHTTPjsonMap
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil)
if err != nil {
t.Error(err)
}
@@ -147,7 +147,7 @@ func TestNewEventExporterCase6(t *testing.T) {
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaVirt
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil)
if err != nil {
t.Error(err)
}
@@ -172,7 +172,7 @@ func TestNewEventExporterDefaultCase(t *testing.T) {
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaNone
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
_, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS)
_, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil)
errExpect := fmt.Sprintf("unsupported exporter type: <%s>", utils.MetaNone)
if err.Error() != errExpect {
t.Errorf("Expected %+v \n but got %+v", errExpect, err)
@@ -186,7 +186,7 @@ func TestNewEventExporterCase7(t *testing.T) {
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
cgrCfg.EEsCfg().Exporters[0].ExportPath = "/invalid/path"
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil)
if err != nil {
t.Error(err)
}
@@ -216,7 +216,7 @@ func TestNewEventExporterCase8(t *testing.T) {
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQL
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
_, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS)
_, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil)
errExpect := "MANDATORY_IE_MISSING: [sqlTableName]"
if err == nil || err.Error() != errExpect {
t.Errorf("Expected %+v \n but got %+v", errExpect, err)
@@ -227,7 +227,7 @@ func TestNewEventExporterCase8(t *testing.T) {
func TestNewEventExporterDcCase(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.GeneralCfg().DefaultTimezone = "invalid_timezone"
_, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, nil)
_, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, nil, nil)
errExpect := "unknown time zone invalid_timezone"
if err == nil || err.Error() != errExpect {
t.Errorf("Expected %+v \n but got %+v", errExpect, err)

View File

@@ -179,7 +179,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *
}
if !isCached {
if ee, err = NewEventExporter(eeS.cfg.EEsCfg().Exporters[cfgIdx], eeS.cfg, eeS.filterS); err != nil {
if ee, err = NewEventExporter(eeS.cfg.EEsCfg().Exporters[cfgIdx], eeS.cfg, eeS.filterS, eeS.connMgr); err != nil {
return
}
if hasCache {

View File

@@ -292,7 +292,7 @@ func TestV1ProcessEvent4(t *testing.T) {
utils.MetaHTTPPost: ltcache.NewCache(1,
time.Second, false, onCacheEvicted),
}
newEeS, err := NewEventExporter(cfg.EEsCfg().Exporters[0], cfg, filterS)
newEeS, err := NewEventExporter(cfg.EEsCfg().Exporters[0], cfg, filterS, nil)
if err != nil {
t.Error(err)
}

View File

@@ -168,7 +168,7 @@ func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *Export
Opts: expEv.Opts,
Attempts: attempts,
FailedPostsDir: utils.MetaNone,
}, config.CgrConfig(), nil); err != nil {
}, config.CgrConfig(), nil, nil); err != nil {
return
}
keyFunc := func() string { return utils.EmptyString }

View File

@@ -53,7 +53,7 @@ func TestNatsEEJetStream(t *testing.T) {
break
}
}
evExp, err := NewEventExporter(cfg, cgrCfg, nil)
evExp, err := NewEventExporter(cfg, cgrCfg, nil, nil)
if err != nil {
t.Fatal(err)
}
@@ -143,7 +143,7 @@ func TestNatsEE(t *testing.T) {
break
}
}
evExp, err := NewEventExporter(cfg, cgrCfg, nil)
evExp, err := NewEventExporter(cfg, cgrCfg, nil, nil)
if err != nil {
t.Fatal(err)
}

137
ees/rpc.go Normal file
View File

@@ -0,0 +1,137 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package ees
import (
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage,
connMgr *engine.ConnManager) (e *RPCee, err error) {
e = &RPCee{
cfg: cfg,
dc: dc,
connMgr: connMgr,
}
err = e.parseOpts()
return
}
type RPCee struct {
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
connMgr *engine.ConnManager
//opts
codec string
serviceMethod string
tls bool
keyPath string
certPath string
caPath string
connIDs []string
connTimeout time.Duration
replyTimeout time.Duration
sync.RWMutex // protect connection
}
func (e *RPCee) Cfg() (eCfg *config.EventExporterCfg) {
return e.cfg
}
func (e *RPCee) Connect() (err error) {
return
}
func (e *RPCee) ExportEvent(args interface{}, _ string) (err error) {
e.Lock()
defer e.Unlock()
var rply string
return e.connMgr.Call(e.connIDs, nil, e.serviceMethod, args, &rply)
}
func (e *RPCee) Close() (err error) {
e.Lock()
defer e.Unlock()
e.connMgr = nil
return
}
func (e *RPCee) GetMetrics() (mp *utils.SafeMapStorage) {
return e.dc
}
func (e *RPCee) PrepareMap(mp map[string]interface{}) (interface{}, error) {
return mp, nil
}
func (e *RPCee) PrepareOrderMap(oMp *utils.OrderedNavigableMap) (interface{}, error) {
mP := make(map[string]interface{})
for i := oMp.GetFirstElement(); i != nil; i = i.Next() {
path := i.Value
val, _ := oMp.Field(path)
if val.AttributeID != utils.EmptyString {
continue
}
path = path[:len(path)-1] // remove the last index
opath := strings.Join(path, utils.NestingSep)
if _, has := mP[opath]; !has {
mP[opath] = val.Data // first item which is not an attribute will become the value
}
}
return mP, nil
}
func (e *RPCee) parseOpts() (err error) {
if e.cfg.Opts.RPCCodec != nil {
e.codec = *e.cfg.Opts.RPCCodec
}
if e.cfg.Opts.ServiceMethod != nil {
e.serviceMethod = *e.cfg.Opts.ServiceMethod
}
if e.cfg.Opts.KeyPath != nil {
e.keyPath = *e.cfg.Opts.KeyPath
}
if e.cfg.Opts.CertPath != nil {
e.certPath = *e.cfg.Opts.CertPath
}
if e.cfg.Opts.CAPath != nil {
e.caPath = *e.cfg.Opts.CAPath
}
if e.cfg.Opts.TLS != nil {
e.tls = *e.cfg.Opts.TLS
}
if e.cfg.Opts.ConnIDs != nil {
e.connIDs = *e.cfg.Opts.ConnIDs
}
if e.cfg.Opts.RPCConnTimeout != nil {
e.connTimeout = *e.cfg.Opts.RPCConnTimeout
}
if e.cfg.Opts.RPCReplyTimeout != nil {
e.replyTimeout = *e.cfg.Opts.RPCReplyTimeout
}
return
}

View File

@@ -311,6 +311,7 @@ const (
MetaAMQPjsonCDR = "*amqp_json_cdr"
MetaAMQPjsonMap = "*amqp_json_map"
MetaAMQPV1jsonMap = "*amqpv1_json_map"
MetaRPC = "*rpc"
MetaSQSjsonMap = "*sqs_json_map"
MetaKafkajsonMap = "*kafka_json_map"
MetaNatsjsonMap = "*nats_json_map"
@@ -2602,6 +2603,7 @@ const (
ElsVersionLow = "elsVersion"
ElsVersionType = "elsVersionType"
ElsWaitForActiveShards = "elsWaitForActiveShards"
// nats
NatsSubject = "natsSubject"
NatsQueueID = "natsQueueID"
@@ -2614,6 +2616,17 @@ const (
NatsJetStream = "natsJetStream"
NatsJetStreamMaxWait = "natsJetStreamMaxWait"
// rpc
RpcCodec = "rpcCodec"
ServiceMethod = "serviceMethod"
KeyPath = "keyPath"
CertPath = "certPath"
CaPath = "caPath"
Tls = "tls"
ConnIDs = "connIDs"
RpcConnTimeout = "rpcConnTimeout"
RpcReplyTimeout = "rpcReplyTimeout"
// processed opts
AMQPQueueIDProcessedCfg = "amqpQueueIDProcessed"
AMQPExchangeProcessedCfg = "amqpExchangeProcessed"

View File

@@ -535,6 +535,10 @@ func DurationPointer(d time.Duration) *time.Duration {
return &d
}
func SliceStringPointer(d []string) *[]string {
return &d
}
func ToIJSON(v interface{}) string {
b, _ := json.MarshalIndent(v, "", " ")
return string(b)