From e0ae593949f0e5bd5bc7f6ea69f8081a6dfb5252 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 3 Apr 2024 18:40:59 +0300 Subject: [PATCH] Set up internal connection to ERs --- cmd/cgr-engine/cgr-engine.go | 13 +++++++++---- utils/consts.go | 1 + 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 7abf0615f..655beb164 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -154,7 +154,7 @@ func startRPC(server *cores.Server, internalRaterChan, internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan, internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan, internalLoaderSChan, internalRALsv1Chan, internalCacheSChan, - internalEEsChan chan birpc.ClientConnector, + internalEEsChan, internalERsChan chan birpc.ClientConnector, shdChan *utils.SyncedChan) { if !cfg.DispatcherSCfg().Enabled { select { // Any of the rpc methods will unlock listening to rpc requests @@ -186,6 +186,8 @@ func startRPC(server *cores.Server, internalRaterChan, internalCacheSChan <- chS case eeS := <-internalEEsChan: internalEEsChan <- eeS + case erS := <-internalERsChan: + internalERsChan <- erS case <-shdChan.Done(): return } @@ -487,6 +489,7 @@ func main() { utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore): internalCoreSv1Chan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRALs): internalRALsChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs): internalEEsChan, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs): internalERsChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): internalDispatcherSChan, utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan, @@ -626,10 +629,9 @@ func main() { ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, internalLoaderSChan, connManager, anz, srvDep) - erS := services.NewEventReaderService(cfg, filterSChan, shdChan, connManager, server, internalERsChan, anz, srvDep) srvManager.AddServices(gvService, attrS, chrS, tS, stS, reS, routeS, schS, rals, - apiSv1, apiSv2, cdrS, smg, coreS, erS, + apiSv1, apiSv2, cdrS, smg, coreS, services.NewDNSAgent(cfg, filterSChan, shdChan, connManager, srvDep), services.NewFreeswitchAgent(cfg, shdChan, connManager, srvDep), services.NewKamailioAgent(cfg, shdChan, connManager, srvDep), @@ -640,6 +642,8 @@ func main() { ldrs, anz, dspS, dspH, dmService, storDBService, services.NewEventExporterService(cfg, filterSChan, connManager, server, internalEEsChan, anz, srvDep), + services.NewEventReaderService(cfg, filterSChan, + shdChan, connManager, server, internalERsChan, anz, srvDep), services.NewSIPAgent(cfg, filterSChan, shdChan, connManager, srvDep), ) srvManager.StartServices() @@ -676,6 +680,7 @@ func main() { engine.IntRPC.AddInternalRPCClient(utils.CoreSv1, internalCoreSv1Chan) engine.IntRPC.AddInternalRPCClient(utils.RALsV1, internalRALsChan) engine.IntRPC.AddInternalRPCClient(utils.EeSv1, internalEEsChan) + engine.IntRPC.AddInternalRPCClient(utils.ErSv1, internalERsChan) engine.IntRPC.AddInternalRPCClient(utils.DispatcherSv1, internalDispatcherSChan) err = initConfigSv1(internalConfigChan, server, anz) @@ -693,7 +698,7 @@ func main() { internalAttributeSChan, internalChargerSChan, internalThresholdSChan, internalRouteSChan, internalSessionSChan, internalAnalyzerSChan, internalDispatcherSChan, internalLoaderSChan, internalRALsChan, - internalCacheSChan, internalEEsChan, shdChan) + internalCacheSChan, internalEEsChan, internalERsChan, shdChan) <-shdChan.Done() shtdDone := make(chan struct{}) diff --git a/utils/consts.go b/utils/consts.go index 8aa592306..98f3662a0 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -413,6 +413,7 @@ const ( MetaUCH = "*uch" MetaGuardian = "*guardians" MetaEEs = "*ees" + MetaERs = "*ers" MetaContinue = "*continue" Migrator = "migrator" UnsupportedMigrationTask = "unsupported migration task"